In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('pySparkMLib1')
    .getOrCreate()
)

**CENARIO 9**


In [3]:
df = spark.read.csv('C:/Users/Belilo/Desktop/SparkJupyter/datasets/ctu13/cenario9.csv', header=True,inferSchema=True)

In [4]:
df.show(5)

+----------+---+-----+-----+-----+-----+-----------+----+----+-------+---------+---------+
|     Label|Dir|Sport|Proto|Dport|State|        Dur|sTos|dTos|TotPkts| TotBytes| SrcBytes|
+----------+---+-----+-----+-----+-----+-----------+----+----+-------+---------+---------+
|Background|<->| 6881|  udp| 6881|  CON|1823.088379|   0|   0|      2|      214|      107|
|Background|<->| 6881|  udp| 6881|  CON|2005.431641|   0|   0|      2|      214|      107|
|Background|<->| 6881|  udp| 6881|  CON|1973.646729|   0|   0|      2|      214|      107|
|Background|<->|35248|  udp|16200|  CON|3599.997803|   0|   0| 409227|207547419|182657149|
|Background|<?>|   80|  tcp|59067|RA_PA|2059.387451|   0|   0| 224275|266462578|  3199174|
+----------+---+-----+-----+-----+-----+-----------+----+----+-------+---------+---------+
only showing top 5 rows



**Contar todos dados do datset**


In [5]:
df.count()

2087508

**Verificar os tipos de dados em cada coluna**

In [6]:
df.printSchema()

root
 |-- Label: string (nullable = true)
 |-- Dir: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Proto: string (nullable = true)
 |-- Dport: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Dur: double (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: long (nullable = true)
 |-- SrcBytes: integer (nullable = true)



**Remonear colunas**

In [7]:
df = df.withColumnRenamed('Proto','protocolo').withColumnRenamed('Sport','PortaOrigem').withColumnRenamed('Dir','Direcao')\
        .withColumnRenamed('Dport','PortaDestino').withColumnRenamed('State','Estado').withColumnRenamed('Dur','Duracao')\
        .withColumnRenamed('Label',"Rotulo")
df.show()

+----------+-------+-----------+---------+------------+-------+-----------+----+----+-------+---------+---------+
|    Rotulo|Direcao|PortaOrigem|protocolo|PortaDestino| Estado|    Duracao|sTos|dTos|TotPkts| TotBytes| SrcBytes|
+----------+-------+-----------+---------+------------+-------+-----------+----+----+-------+---------+---------+
|Background|    <->|       6881|      udp|        6881|    CON|1823.088379|   0|   0|      2|      214|      107|
|Background|    <->|       6881|      udp|        6881|    CON|2005.431641|   0|   0|      2|      214|      107|
|Background|    <->|       6881|      udp|        6881|    CON|1973.646729|   0|   0|      2|      214|      107|
|Background|    <->|      35248|      udp|       16200|    CON|3599.997803|   0|   0| 409227|207547419|182657149|
|Background|    <?>|         80|      tcp|       59067|  RA_PA|2059.387451|   0|   0| 224275|266462578|  3199174|
|Background|    <?>|       1176|      tcp|          80| FPA_FA|  14.998429|   0|   0|   

**Mostrar Nomes das Colunas**

In [8]:
df.columns

['Rotulo',
 'Direcao',
 'PortaOrigem',
 'protocolo',
 'PortaDestino',
 'Estado',
 'Duracao',
 'sTos',
 'dTos',
 'TotPkts',
 'TotBytes',
 'SrcBytes']

In [9]:
df.show()
               

+----------+-------+-----------+---------+------------+-------+-----------+----+----+-------+---------+---------+
|    Rotulo|Direcao|PortaOrigem|protocolo|PortaDestino| Estado|    Duracao|sTos|dTos|TotPkts| TotBytes| SrcBytes|
+----------+-------+-----------+---------+------------+-------+-----------+----+----+-------+---------+---------+
|Background|    <->|       6881|      udp|        6881|    CON|1823.088379|   0|   0|      2|      214|      107|
|Background|    <->|       6881|      udp|        6881|    CON|2005.431641|   0|   0|      2|      214|      107|
|Background|    <->|       6881|      udp|        6881|    CON|1973.646729|   0|   0|      2|      214|      107|
|Background|    <->|      35248|      udp|       16200|    CON|3599.997803|   0|   0| 409227|207547419|182657149|
|Background|    <?>|         80|      tcp|       59067|  RA_PA|2059.387451|   0|   0| 224275|266462578|  3199174|
|Background|    <?>|       1176|      tcp|          80| FPA_FA|  14.998429|   0|   0|   

**Selecionar Colunas, atribuir ao novo datafreme**

In [10]:
df = df.select(col('Duracao'),col('protocolo'),col('PortaOrigem'),col('Direcao'),col('PortaDestino'),col('Estado')\
         ,col('sTos'),col('dTos'),col('TotPkts'),col('TotBytes'),col('SrcBytes'),col('Rotulo'))

df.show(5)

+-----------+---------+-----------+-------+------------+------+----+----+-------+---------+---------+----------+
|    Duracao|protocolo|PortaOrigem|Direcao|PortaDestino|Estado|sTos|dTos|TotPkts| TotBytes| SrcBytes|    Rotulo|
+-----------+---------+-----------+-------+------------+------+----+----+-------+---------+---------+----------+
|1823.088379|      udp|       6881|    <->|        6881|   CON|   0|   0|      2|      214|      107|Background|
|2005.431641|      udp|       6881|    <->|        6881|   CON|   0|   0|      2|      214|      107|Background|
|1973.646729|      udp|       6881|    <->|        6881|   CON|   0|   0|      2|      214|      107|Background|
|3599.997803|      udp|      35248|    <->|       16200|   CON|   0|   0| 409227|207547419|182657149|Background|
|2059.387451|      tcp|         80|    <?>|       59067| RA_PA|   0|   0| 224275|266462578|  3199174|Background|
+-----------+---------+-----------+-------+------------+------+----+----+-------+---------+-----

**Alterar o tipo de dados**

In [11]:
df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: string (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: long (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



In [12]:
df = df.withColumn('PortaOrigem',col('Portaorigem').cast(IntegerType()))\
        .withColumn('PortaDestino',col('PortaDestino').cast(IntegerType()))

df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: integer (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: integer (nullable = true)
 |-- Estado: string (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: long (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



**Tratando Valores Nulos / dados faltante**

**Verificar Valores Nulos em cada coluna {Nome_coluna : Qtd de Valores nulos}**

In [13]:
for coluna in df.columns:
    print(coluna," : ", df.filter(df[coluna].isNull()).count())

Duracao  :  0
protocolo  :  0
PortaOrigem  :  24470
Direcao  :  0
PortaDestino  :  23862
Estado  :  3
sTos  :  9031
dTos  :  173232
TotPkts  :  0
TotBytes  :  0
SrcBytes  :  0
Rotulo  :  0


**Preencher Valores nulos apartir do calculo da moda (a frequencia dos dados)**

In [14]:
# A impressao dos valores eh so para mostrar qual eh o valor mais frequente (a moda) em cda uma das colunas que posuem dados nulos
print("Nome da coluna : O valor com maior frequencia (Moda)")

for coluna in df.columns:
    if df.filter(df[coluna].isNull()).count() != 0:
        count_mode_val = df.groupBy(coluna).count().filter(col(coluna).isNotNull())\
                            .agg(max("count")).collect()[0][0]
        mode_val = df.groupBy(coluna).count().filter(col(coluna).isNotNull())\
                            .filter(col("count") == count_mode_val).select(coluna).collect()[0][0]

        print(coluna," : ", mode_val)
        df=df.na.fill(mode_val,subset=[coluna])

Nome da coluna : O valor com maior frequencia (Moda)
PortaOrigem  :  53
PortaDestino  :  13363
Estado  :  CON
sTos  :  0
dTos  :  0


**Verificando Novamente Valores nulos**

In [15]:
for coluna in df.columns:
    print(coluna," : ", df.filter(df[coluna].isNull()).count())

Duracao  :  0
protocolo  :  0
PortaOrigem  :  0
Direcao  :  0
PortaDestino  :  0
Estado  :  0
sTos  :  0
dTos  :  0
TotPkts  :  0
TotBytes  :  0
SrcBytes  :  0
Rotulo  :  0


**Mostrar o dataset**

In [16]:
df.show(5)

+-----------+---------+-----------+-------+------------+------+----+----+-------+---------+---------+----------+
|    Duracao|protocolo|PortaOrigem|Direcao|PortaDestino|Estado|sTos|dTos|TotPkts| TotBytes| SrcBytes|    Rotulo|
+-----------+---------+-----------+-------+------------+------+----+----+-------+---------+---------+----------+
|1823.088379|      udp|       6881|    <->|        6881|   CON|   0|   0|      2|      214|      107|Background|
|2005.431641|      udp|       6881|    <->|        6881|   CON|   0|   0|      2|      214|      107|Background|
|1973.646729|      udp|       6881|    <->|        6881|   CON|   0|   0|      2|      214|      107|Background|
|3599.997803|      udp|      35248|    <->|       16200|   CON|   0|   0| 409227|207547419|182657149|Background|
|2059.387451|      tcp|         80|    <?>|       59067| RA_PA|   0|   0| 224275|266462578|  3199174|Background|
+-----------+---------+-----------+-------+------------+------+----+----+-------+---------+-----

**Tratando dados ruidosos** 

In [17]:
df.createOrReplaceTempView("tabela")
spark.sql("SELECT count(*) from tabela").show()

+--------+
|count(1)|
+--------+
| 2087508|
+--------+



In [18]:
spark.sql("SELECT DISTINCT Duracao from tabela").show()

+-----------+
|    Duracao|
+-----------+
|   6.940647|
|2934.411621|
| 148.079163|
|3542.162598|
|2935.303711|
|    3.71E-4|
|  16.357012|
|3599.464844|
|     7.5E-4|
|   0.130722|
| 191.274338|
|   0.336695|
|   2.020986|
|3176.427979|
|   0.327814|
|3049.628174|
|   0.114029|
|  19.934669|
|  18.339931|
|3006.249512|
+-----------+
only showing top 20 rows



In [19]:
spark.sql("SELECT DISTINCT PortaOrigem from tabela").show()

+-----------+
|PortaOrigem|
+-----------+
|      40574|
|       1238|
|      20683|
|      64590|
|      31261|
|      34061|
|      23364|
|      27974|
|      49855|
|      43527|
|      33569|
|      16574|
|       1342|
|      20497|
|      46465|
|      26755|
|      47501|
|      11748|
|      20735|
|      38723|
+-----------+
only showing top 20 rows



In [20]:
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|who    |
|<?     |
|?>     |
|<->    |
|<?>    |
|<-     |
|->     |
+-------+



**O Uso da funcao do replace do spark foi necessaria para tratar dados ruidosos na coluna Direcao**
Tratandosse de dados bidirecional, a coluna deveria ter direcao para duplo sentido, sentido esquedo e direito

In [21]:
# Funcao replace do sql

df=spark.sql("SELECT Duracao, protocolo, PortaOrigem, REPLACE(Direcao, '?', '-') as Direcao, PortaDestino, Estado\
            ,sTos, dTos, TotPkts, TotBytes, SrcBytes, Rotulo  from tabela")

In [22]:
df.createOrReplaceTempView("tabela")
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|who    |
|<->    |
|<-     |
|->     |
+-------+



In [23]:
df=df.withColumn('Direcao', regexp_replace('Direcao','who','<->'))
df.createOrReplaceTempView("tabela")
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|<->    |
|<-     |
|->     |
+-------+



**Elimanar linhas repetidas - a funcao distintct() e a dropDuplicates() (quando nao passado nenhum parametro funcianam da mesma maneira, eliminanam linhas repetidas e retornam um novo datafreme)**

In [24]:
#Total de linhas antes da eliminacao de linhas duplicadas 
df.count()

2087508

In [25]:
# Contar linhas duplicadas 

df.groupBy(df.columns).count().where(col('count') > 1).select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|     60234|
+----------+



In [26]:
# Total de linhas depois de eliminacao de duplicatas
df.dropDuplicates().count()

2040586

In [27]:
df=df.dropDuplicates()

In [28]:
df.count()

2040586

**Continuacao de preprocessamento - Etapa de Transformacao de dados e Mineracao (aplicando algoritmos de machine learning)**

Creando indice para para as variaveis categoricas 

In [29]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder


In [30]:
df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: integer (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: integer (nullable = true)
 |-- Estado: string (nullable = false)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: long (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



In [31]:
# criar o objeto da classe StringIndexer com especificacao da coluna de entrada e saída
SI_protocolo = StringIndexer(inputCol='protocolo',outputCol='protocolo_index')
SI_direcao = StringIndexer(inputCol='Direcao',outputCol='Direcao_index')
SI_estado = StringIndexer(inputCol='Estado',outputCol='Estado_index')
SI_rotulo = StringIndexer(inputCol='Rotulo',outputCol='rotulo_index')

In [32]:
# Aplicando a funcao transfoma(df), nos objectos criados 
df = SI_protocolo.fit(df).transform(df)
df = SI_direcao.fit(df).transform(df)
df = SI_estado.fit(df).transform(df)
df = SI_rotulo.fit(df).transform(df)

In [33]:
df.show(5)

+-----------+---------+-----------+-------+------------+------+----+----+-------+--------+--------+----------+---------------+-------------+------------+------------+
|    Duracao|protocolo|PortaOrigem|Direcao|PortaDestino|Estado|sTos|dTos|TotPkts|TotBytes|SrcBytes|    Rotulo|protocolo_index|Direcao_index|Estado_index|rotulo_index|
+-----------+---------+-----------+-------+------------+------+----+----+-------+--------+--------+----------+---------------+-------------+------------+------------+
|  28.917549|      tcp|      54145|    <->|       56950|RPA_PA|   0|   0|   6021| 5651087|  138137|Background|            1.0|          0.0|        50.0|         0.0|
|2476.756836|      tcp|      21456|    <->|         443|FPA_PA|   0|   0|    551|   60009|   32677|Background|            1.0|          0.0|        89.0|         0.0|
|1830.480713|      udp|      12114|    <->|       18720|   CON|   0|   0|      6|     407|     223|Background|            0.0|          0.0|         0.0|         0.0

In [34]:
df.columns

['Duracao',
 'protocolo',
 'PortaOrigem',
 'Direcao',
 'PortaDestino',
 'Estado',
 'sTos',
 'dTos',
 'TotPkts',
 'TotBytes',
 'SrcBytes',
 'Rotulo',
 'protocolo_index',
 'Direcao_index',
 'Estado_index',
 'rotulo_index']

In [35]:
# Verificando  os dados na tabela apos a aplicacao da transform(df)
df.select('Duracao','protocolo','protocolo_index','PortaOrigem','Direcao','Direcao_index','PortaDestino'\
          ,'Estado','Estado_index','sTos','dTos','TotPkts','TotBytes','SrcBytes','Rotulo','rotulo_index').show(3)
     

+-----------+---------+---------------+-----------+-------+-------------+------------+------+------------+----+----+-------+--------+--------+----------+------------+
|    Duracao|protocolo|protocolo_index|PortaOrigem|Direcao|Direcao_index|PortaDestino|Estado|Estado_index|sTos|dTos|TotPkts|TotBytes|SrcBytes|    Rotulo|rotulo_index|
+-----------+---------+---------------+-----------+-------+-------------+------------+------+------------+----+----+-------+--------+--------+----------+------------+
|  28.917549|      tcp|            1.0|      54145|    <->|          0.0|       56950|RPA_PA|        50.0|   0|   0|   6021| 5651087|  138137|Background|         0.0|
|2476.756836|      tcp|            1.0|      21456|    <->|          0.0|         443|FPA_PA|        89.0|   0|   0|    551|   60009|   32677|Background|         0.0|
|1830.480713|      udp|            0.0|      12114|    <->|          0.0|       18720|   CON|         0.0|   0|   0|      6|     407|     223|Background|         0.0

Alternativo

In [36]:

# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['Duracao', 'protocolo_index','PortaOrigem','Direcao_index','PortaDestino','Estado_index','sTos','dTos','TotPkts','TotBytes','SrcBytes','rotulo_index'], outputCols=['Duracao_OHE', 'protocolo_index_OHE','PortaOrigem_OHE','Direcao_index_OHE','PortaDestino_OHE','Estado_index_OHE','sTos_OHE','dTos_OHE','TotPkts_OHE','TotBytes_OHE','SrcBytes_OHE','rotulo_index_OHE'])

# transform the data
#df = OHE.fit.transform(df)

# view and transform the data
#df.select('Duracao_OHE', 'protocolo_index_OHE','PortaOrigem_OHE','Direcao_index_OHE','PortaDestino_OHE','Estado_index_OHE').show(truncate=False)

     

**VectorAssembler criado ira conter a transformacao de dados usadas para testar algoritmos**


In [39]:
from pyspark.ml.feature import VectorAssembler

In [40]:
# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Duracao', 'protocolo_index','PortaOrigem','Direcao_index','PortaDestino'\
                                       ,'Estado_index','sTos','dTos','TotPkts','TotBytes'\
                                       ,'SrcBytes'], outputCol='features')

In [42]:

# datafreme para testar modelos
dfmodel = assembler.transform(df)
     

# mostrar as colunas do  vector transformado
dfmodel.select('features','rotulo_index').show(truncate=False)

+--------------------------------------------------------------------------+------------+
|features                                                                  |rotulo_index|
+--------------------------------------------------------------------------+------------+
|[28.917549,1.0,54145.0,0.0,56950.0,50.0,0.0,0.0,6021.0,5651087.0,138137.0]|0.0         |
|[2476.756836,1.0,21456.0,0.0,443.0,89.0,0.0,0.0,551.0,60009.0,32677.0]    |0.0         |
|(11,[0,2,4,8,9,10],[1830.480713,12114.0,18720.0,6.0,407.0,223.0])         |0.0         |
|[2207.80249,0.0,17214.0,1.0,12114.0,2.0,0.0,0.0,28.0,2128.0,2128.0]       |0.0         |
|(11,[0,2,4,8,9,10],[2173.965576,62713.0,12114.0,16.0,1074.0,579.0])       |0.0         |
|[200.340805,1.0,443.0,0.0,46020.0,105.0,0.0,0.0,17.0,1220.0,594.0]        |0.0         |
|(11,[0,2,4,8,9,10],[478.096497,12114.0,44120.0,28.0,1890.0,1036.0])       |0.0         |
|(11,[0,2,4,8,9,10],[2.67E-4,53.0,49669.0,2.0,324.0,76.0])                 |0.0         |
|[3559.997

In [43]:
#Datafreme do modelo (Com novos nomes para as variaveis -features e rotulo_index) 
dfmodelfinal = dfmodel.select(['features','rotulo_index'])
dfmodelfinal = dfmodelfinal.withColumnRenamed("rotulo_index","rotulo").withColumnRenamed('features','atributos')
dfmodelfinal.printSchema()

root
 |-- atributos: vector (nullable = true)
 |-- rotulo: double (nullable = false)



In [44]:

#Dividr dados para teste e treno (70% treino,30% teste)
df_treinamento,df_teste = dfmodelfinal.randomSplit([0.7,0.3])
     

In [45]:
print('Total para treino: ',df_treinamento.count())
print('Total de dados de teste: ',df_teste.count())

Total para treino:  1429480
Total de dados de teste:  611106


In [46]:
#Classes para treinamento de modelo na 
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

**Regressao Logistica**

In [47]:
#Create a logistic regression model object
from pyspark.ml.classification import LogisticRegression
log_reg=LogisticRegression(labelCol='rotulo', featuresCol='atributos', maxIter=50)
log_reg_Modelo=log_reg.fit(df_treinamento)



In [48]:
#lr_summary=log_reg.summary
     

#Overall accuracy of the classification model
#print("Acurace: ",lr_summary.accuracy)
     

#Area under ROC
#print('ROC: ')
#lr_summary.areaUnderROC
     

#Precision of both classes
#print('precision: ',lr_summary.precisionByLabel)
     


#Recall of both classes
#print('recal: ',lr_summary.recallByLabel)
     


In [49]:
predicao_teste = log_reg_Modelo.transform(df_teste)


In [50]:
predicao_teste.show()

+--------------------+------+--------------------+--------------------+----------+
|           atributos|rotulo|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(11,[0,2,4,8,9,10...|   0.0|[10.5281544356944...|[0.97663323946268...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[3.97560099774125...|[0.98546932960167...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[6.90181061486329...|[0.98233481400460...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[8.20492469444810...|[0.98046330448648...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[9.23797833786984...|[0.97884271760250...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[9.29355639715148...|[0.97875190443190...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[9.71960527372954...|[0.97804290245102...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[10.2334890184796...|[0.97715692454106...|       0.0|
|(11,[0,2,4,8,9,10...|   0.0|[10.2480730439199...|[0.97713126494073...|       0.0|
|(11

In [51]:
predicaorotulo=predicao_teste.select('rotulo','prediction')

In [52]:
Evaluator = MulticlassClassificationEvaluator(labelCol='rotulo', predictionCol='prediction', metricName='accuracy')
acuracia_LR = Evaluator.evaluate(predicaorotulo)

print("Acuracia: ", acuracia_LR)

Acuracia:  0.9430769784620017


In [53]:
Evaluator = MulticlassClassificationEvaluator(labelCol='rotulo', predictionCol='prediction', metricName='precisionByLabel')
precisao_LR = Evaluator.evaluate(predicaorotulo)

print("Precisao: ", precisao_LR)

Precisao:  0.9623450692778562


In [54]:
Evaluator = MulticlassClassificationEvaluator(labelCol='rotulo', predictionCol='prediction', metricName='recallByLabel')
recal_LR = Evaluator.evaluate(predicaorotulo)

print("Recal: ", recal_LR)

Recal:  0.975244460051915


**Naive Bayes**

In [55]:
NB= NaiveBayes(labelCol='rotulo',featuresCol='atributos', smoothing=1.0)
modeloNB = NB.fit(df_treinamento)

In [56]:
predicao_testeNB = modeloNB.transform(df_teste)

In [57]:
predicao_testeNB.show()

+--------------------+------+--------------------+--------------------+----------+
|           atributos|rotulo|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(11,[0,2,4,8,9,10...|   0.0|[-49006.603165950...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-16903.524856093...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-31129.641362864...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-37542.953372078...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-42640.217433636...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-42849.624438119...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-44975.779703901...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-47510.955183655...|       [0.0,0.0,1.0]|       2.0|
|(11,[0,2,4,8,9,10...|   0.0|[-47686.049294214...|       [0.0,0.0,1.0]|       2.0|
|(11

In [58]:
predicaorotuloNB=predicao_testeNB.select('rotulo','prediction')

In [59]:
Evaluator = MulticlassClassificationEvaluator(labelCol='rotulo', predictionCol='prediction', metricName='accuracy')
acuracia_NB = Evaluator.evaluate(predicaorotuloNB)

print("Acuracia: ", acuracia_NB)

Acuracia:  0.25176319656491675


In [60]:
Evaluator = MulticlassClassificationEvaluator(labelCol='rotulo', predictionCol='prediction', metricName='precisionByLabel')
precisao_NB = Evaluator.evaluate(predicaorotulo)

print("Precisao: ", precisao_NB)

Precisao:  0.9623450692778562


In [61]:
Evaluator = MulticlassClassificationEvaluator(labelCol='rotulo', predictionCol='prediction', metricName='recallByLabel')
recal_NB = Evaluator.evaluate(predicaorotulo)

print("Recal: ", recal_NB)

Recal:  0.975244460051915


  **GBT Classifier (Gradient Boost Tree Classifier)**

In [63]:
GBT_C =GBTClassifier(labelCol='rotulo',featuresCol='atributos')


In [None]:
modeloGBT = GBT_C.fit(df_treinamento)

**Random Forest Classifier**

In [72]:
rfc = RandomForestClassifier(labelCol='rotulo', featuresCol='atributos')


In [None]:
modeloRFC = rfc.fit(df_treinamento)

In [None]:
predicao_testeRFC=modeloRFC.transform(df_teste)

In [None]:
predicao_testeRFC.show(5)

**CENARIO 10**

In [43]:
df = spark.read.csv('C:/Users/Belilo/Desktop/SparkJupyter/datasets/ctu13/cenario10.csv', header=True,inferSchema=True)

In [44]:
df.show(5)

+----------+-----+-----+---+-----+--------+-----------+----+----+-------+--------+--------+
|     Label|Proto|Sport|Dir|Dport|   State|        Dur|sTos|dTos|TotPkts|TotBytes|SrcBytes|
+----------+-----+-----+---+-----+--------+-----------+----+----+-------+--------+--------+
|Background|  udp|60621|<->|63550|     CON| 2752.65625|   0|   0|      3|     435|     290|
|Background|  udp|51413|<->|63550|     CON|1849.315552|   0|   0|      3|     417|     272|
|Background|  udp|63195|<->|63550|     CON|2091.747314|   0|   0|      2|     290|     145|
|Background|  udp|39110|<->|63550|     CON|1535.769409|   0|   0|      2|     290|     145|
|Background|  tcp|33426|<?>|25443|FRPA_FPA|   0.002636|   0|   0|      6|     490|     321|
+----------+-----+-----+---+-----+--------+-----------+----+----+-------+--------+--------+
only showing top 5 rows



**Contar todos dados do datset**


In [45]:
df.count()

1309791

**Verificar os tipos de dados em cada coluna**

In [46]:
df.printSchema()

root
 |-- Label: string (nullable = true)
 |-- Proto: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Dir: string (nullable = true)
 |-- Dport: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Dur: double (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: integer (nullable = true)
 |-- SrcBytes: integer (nullable = true)



**Remonear colunas**

In [47]:
df = df.withColumnRenamed('Proto','protocolo').withColumnRenamed('Sport','PortaOrigem').withColumnRenamed('Dir','Direcao')\
        .withColumnRenamed('Dport','PortaDestino').withColumnRenamed('State','Estado').withColumnRenamed('Dur','Duracao')\
        .withColumnRenamed('Label',"Rotulo")
df.show()

+----------+---------+-----------+-------+------------+--------+-----------+----+----+-------+--------+--------+
|    Rotulo|protocolo|PortaOrigem|Direcao|PortaDestino|  Estado|    Duracao|sTos|dTos|TotPkts|TotBytes|SrcBytes|
+----------+---------+-----------+-------+------------+--------+-----------+----+----+-------+--------+--------+
|Background|      udp|      60621|    <->|       63550|     CON| 2752.65625|   0|   0|      3|     435|     290|
|Background|      udp|      51413|    <->|       63550|     CON|1849.315552|   0|   0|      3|     417|     272|
|Background|      udp|      63195|    <->|       63550|     CON|2091.747314|   0|   0|      2|     290|     145|
|Background|      udp|      39110|    <->|       63550|     CON|1535.769409|   0|   0|      2|     290|     145|
|Background|      tcp|      33426|    <?>|       25443|FRPA_FPA|   0.002636|   0|   0|      6|     490|     321|
|Background|      udp|      41915|    <->|       43087|     CON|   72.43679|   0|   0|  23849|24

**Mostrar Nomes das Colunas**

In [48]:
df.columns

['Rotulo',
 'protocolo',
 'PortaOrigem',
 'Direcao',
 'PortaDestino',
 'Estado',
 'Duracao',
 'sTos',
 'dTos',
 'TotPkts',
 'TotBytes',
 'SrcBytes']

In [49]:
df.show()
               

+----------+---------+-----------+-------+------------+--------+-----------+----+----+-------+--------+--------+
|    Rotulo|protocolo|PortaOrigem|Direcao|PortaDestino|  Estado|    Duracao|sTos|dTos|TotPkts|TotBytes|SrcBytes|
+----------+---------+-----------+-------+------------+--------+-----------+----+----+-------+--------+--------+
|Background|      udp|      60621|    <->|       63550|     CON| 2752.65625|   0|   0|      3|     435|     290|
|Background|      udp|      51413|    <->|       63550|     CON|1849.315552|   0|   0|      3|     417|     272|
|Background|      udp|      63195|    <->|       63550|     CON|2091.747314|   0|   0|      2|     290|     145|
|Background|      udp|      39110|    <->|       63550|     CON|1535.769409|   0|   0|      2|     290|     145|
|Background|      tcp|      33426|    <?>|       25443|FRPA_FPA|   0.002636|   0|   0|      6|     490|     321|
|Background|      udp|      41915|    <->|       43087|     CON|   72.43679|   0|   0|  23849|24

**Selecionar Colunas, atribuir ao novo datafreme**

In [50]:
df = df.select(col('Duracao'),col('protocolo'),col('PortaOrigem'),col('Direcao'),col('PortaDestino'),col('Estado')\
         ,col('sTos'),col('dTos'),col('TotPkts'),col('TotBytes'),col('SrcBytes'),col('Rotulo'))

df.show(5)

+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+--------+----------+
|    Duracao|protocolo|PortaOrigem|Direcao|PortaDestino|  Estado|sTos|dTos|TotPkts|TotBytes|SrcBytes|    Rotulo|
+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+--------+----------+
| 2752.65625|      udp|      60621|    <->|       63550|     CON|   0|   0|      3|     435|     290|Background|
|1849.315552|      udp|      51413|    <->|       63550|     CON|   0|   0|      3|     417|     272|Background|
|2091.747314|      udp|      63195|    <->|       63550|     CON|   0|   0|      2|     290|     145|Background|
|1535.769409|      udp|      39110|    <->|       63550|     CON|   0|   0|      2|     290|     145|Background|
|   0.002636|      tcp|      33426|    <?>|       25443|FRPA_FPA|   0|   0|      6|     490|     321|Background|
+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+----

**Alterar o tipo de dados**

In [51]:
df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: string (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: integer (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



In [52]:
df = df.withColumn('PortaOrigem',col('Portaorigem').cast(IntegerType()))\
        .withColumn('PortaDestino',col('PortaDestino').cast(IntegerType()))

df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: integer (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: integer (nullable = true)
 |-- Estado: string (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: integer (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



**Tratando Valores Nulos / dados faltante**

**Verificar Valores Nulos em cada coluna {Nome_coluna : Qtd de Valores nulos}**

In [53]:
for coluna in df.columns:
    print(coluna," : ", df.filter(df[coluna].isNull()).count())

Duracao  :  0
protocolo  :  0
PortaOrigem  :  135176
Direcao  :  0
PortaDestino  :  135709
Estado  :  1228
sTos  :  12852
dTos  :  186791
TotPkts  :  0
TotBytes  :  0
SrcBytes  :  0
Rotulo  :  0


**Preencher Valores nulos apartir do calculo da moda (a frequencia dos dados)**

In [54]:
# A impressao dos valores eh so para mostrar qual eh o valor mais frequente (a moda) em cda uma das colunas que posuem dados nulos
print("Nome da coluna : O valor com maior frequencia (Moda)")

for coluna in df.columns:
    if df.filter(df[coluna].isNull()).count() != 0:
        count_mode_val = df.groupBy(coluna).count().filter(col(coluna).isNotNull())\
                            .agg(max("count")).collect()[0][0]
        mode_val = df.groupBy(coluna).count().filter(col(coluna).isNotNull())\
                            .filter(col("count") == count_mode_val).select(coluna).collect()[0][0]

        print(coluna," : ", mode_val)
        df=df.na.fill(mode_val,subset=[coluna])

Nome da coluna : O valor com maior frequencia (Moda)
PortaOrigem  :  13363
PortaDestino  :  53
Estado  :  CON
sTos  :  0
dTos  :  0


**Verificando Novamente Valores nulos**

In [55]:
for coluna in df.columns:
    print(coluna," : ", df.filter(df[coluna].isNull()).count())

Duracao  :  0
protocolo  :  0
PortaOrigem  :  0
Direcao  :  0
PortaDestino  :  0
Estado  :  0
sTos  :  0
dTos  :  0
TotPkts  :  0
TotBytes  :  0
SrcBytes  :  0
Rotulo  :  0


**Mostrar o dataset**

In [56]:
df.show(5)

+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+--------+----------+
|    Duracao|protocolo|PortaOrigem|Direcao|PortaDestino|  Estado|sTos|dTos|TotPkts|TotBytes|SrcBytes|    Rotulo|
+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+--------+----------+
| 2752.65625|      udp|      60621|    <->|       63550|     CON|   0|   0|      3|     435|     290|Background|
|1849.315552|      udp|      51413|    <->|       63550|     CON|   0|   0|      3|     417|     272|Background|
|2091.747314|      udp|      63195|    <->|       63550|     CON|   0|   0|      2|     290|     145|Background|
|1535.769409|      udp|      39110|    <->|       63550|     CON|   0|   0|      2|     290|     145|Background|
|   0.002636|      tcp|      33426|    <?>|       25443|FRPA_FPA|   0|   0|      6|     490|     321|Background|
+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+----

**Tratando dados ruidosos** 

**Modando nome da coluna porta origem e destino para que as consultas da tempView funcionem
    (o spark.sql nao reconhece nomes de colunas unidas por " - ", ele considera como se fosse dois nomes)**

In [57]:
df.show(5)

+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+--------+----------+
|    Duracao|protocolo|PortaOrigem|Direcao|PortaDestino|  Estado|sTos|dTos|TotPkts|TotBytes|SrcBytes|    Rotulo|
+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+--------+----------+
| 2752.65625|      udp|      60621|    <->|       63550|     CON|   0|   0|      3|     435|     290|Background|
|1849.315552|      udp|      51413|    <->|       63550|     CON|   0|   0|      3|     417|     272|Background|
|2091.747314|      udp|      63195|    <->|       63550|     CON|   0|   0|      2|     290|     145|Background|
|1535.769409|      udp|      39110|    <->|       63550|     CON|   0|   0|      2|     290|     145|Background|
|   0.002636|      tcp|      33426|    <?>|       25443|FRPA_FPA|   0|   0|      6|     490|     321|Background|
+-----------+---------+-----------+-------+------------+--------+----+----+-------+--------+----

In [58]:
df.createOrReplaceTempView("tabela")
spark.sql("SELECT count(*) from tabela").show()

+--------+
|count(1)|
+--------+
| 1309791|
+--------+



In [59]:
spark.sql("SELECT DISTINCT Duracao from tabela").show()

+-----------+
|    Duracao|
+-----------+
|    7.18E-4|
|3017.522949|
| 3552.17749|
| 416.194489|
|     7.5E-4|
|  516.55481|
| 114.511345|
|    0.00195|
|   0.059421|
|3228.565918|
| 244.545197|
| 309.717377|
|    3.71E-4|
|    0.19075|
|   1.511531|
|   0.179585|
|   0.001894|
|  16.983456|
|3376.620605|
|3096.484131|
+-----------+
only showing top 20 rows



In [60]:
spark.sql("SELECT DISTINCT PortaOrigem from tabela").count()

63057

In [61]:
spark.sql("SELECT DISTINCT PortaOrigem from tabela").show()

+-----------+
|PortaOrigem|
+-----------+
|       1238|
|      53565|
|      53963|
|       4935|
|      35820|
|      43527|
|      13840|
|      37111|
|      63155|
|       6620|
|      44822|
|      29894|
|      53634|
|      49717|
|      20924|
|      50348|
|       2866|
|      57201|
|      23336|
|      35361|
+-----------+
only showing top 20 rows



In [62]:
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|who    |
|?>     |
|<->    |
|<?>    |
|<-     |
|->     |
+-------+



**O Uso da funcao do replace do spark foi necessaria para tratar dados ruidosos na coluna Direcao**
Tratandosse de dados bidirecional, a coluna deveria ter direcao para duplo sentido, sentido esquedo e direito

In [63]:
# Funcao replace do sql

df=spark.sql("SELECT Duracao, protocolo, PortaOrigem, REPLACE(Direcao, '?', '-') as Direcao, PortaDestino, Estado\
            ,sTos, dTos, TotPkts, TotBytes, SrcBytes, Rotulo  from tabela")

In [64]:
df.createOrReplaceTempView("tabela")
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|who    |
|<->    |
|<-     |
|->     |
+-------+



In [65]:
df=df.withColumn('Direcao', regexp_replace('Direcao','who','<->'))
df.createOrReplaceTempView("tabela")
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|<->    |
|<-     |
|->     |
+-------+



**Elimanar linhas repetidas - a funcao distintct() e a dropDuplicates() (quando nao passado nenhum parametro funcianam da mesma maneira, eliminanam linhas repetidas e retornam um novo datafreme)**

In [66]:
#Total de linhas antes da eliminacao de linhas duplicadas 
df.count()

1309791

In [67]:
# Contar linhas duplicadas 

df.groupBy(df.columns).count().where(col('count') > 1).select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|    125547|
+----------+



In [68]:
# Total de linhas depois de eliminacao de duplicatas
df.dropDuplicates().count()

1189304

In [69]:
df=df.dropDuplicates()

In [70]:
df.count()

1189304

**CENARIO 11**

In [71]:
df = spark.read.csv('C:/Users/Belilo/Desktop/SparkJupyter/datasets/ctu13/cenario11.csv', header=True,inferSchema=True)

In [72]:
df.show(5)

+----------+-----+-----+---+-----+-------+----------+----+----+-------+---------+--------+
|     Label|Proto|Sport|Dir|Dport|  State|       Dur|sTos|dTos|TotPkts| TotBytes|SrcBytes|
+----------+-----+-----+---+-----+-------+----------+----+----+-------+---------+--------+
|Background|  tcp| 1078|<?>|   80|RPA_FPA| 83.062141|   0|   0|  43065| 40974671| 1033777|
|Background|  tcp|13121|<?>|62860| RPA_PA|497.720459|   0|   0| 326962|132430976| 7076046|
|Background|  tcp|   80|<?>| 3088|   PA_A|971.288147|   0|   0|   7912|  7356876| 7153650|
|Background|  tcp|54518|<?>|  993|  PA_PA|899.996399|   0|   0|     48|     5728|    3008|
|Background|  tcp|19083| ?>| 2185|   RPA_| 38.753445|   0|null|      7|      456|     456|
+----------+-----+-----+---+-----+-------+----------+----+----+-------+---------+--------+
only showing top 5 rows



**Contar todos dados do datset**


In [73]:
df.count()

107251

**Verificar os tipos de dados em cada coluna**

In [74]:
df.printSchema()

root
 |-- Label: string (nullable = true)
 |-- Proto: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Dir: string (nullable = true)
 |-- Dport: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Dur: double (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: integer (nullable = true)
 |-- SrcBytes: integer (nullable = true)



**Remonear colunas**

In [75]:
df = df.withColumnRenamed('Proto','protocolo').withColumnRenamed('Sport','PortaOrigem').withColumnRenamed('Dir','Direcao')\
        .withColumnRenamed('Dport','PortaDestino').withColumnRenamed('State','Estado').withColumnRenamed('Dur','Duracao')\
        .withColumnRenamed('Label',"Rotulo")
df.show()

+----------+---------+-----------+-------+------------+-------+----------+----+----+-------+---------+--------+
|    Rotulo|protocolo|PortaOrigem|Direcao|PortaDestino| Estado|   Duracao|sTos|dTos|TotPkts| TotBytes|SrcBytes|
+----------+---------+-----------+-------+------------+-------+----------+----+----+-------+---------+--------+
|Background|      tcp|       1078|    <?>|          80|RPA_FPA| 83.062141|   0|   0|  43065| 40974671| 1033777|
|Background|      tcp|      13121|    <?>|       62860| RPA_PA|497.720459|   0|   0| 326962|132430976| 7076046|
|Background|      tcp|         80|    <?>|        3088|   PA_A|971.288147|   0|   0|   7912|  7356876| 7153650|
|Background|      tcp|      54518|    <?>|         993|  PA_PA|899.996399|   0|   0|     48|     5728|    3008|
|Background|      tcp|      19083|     ?>|        2185|   RPA_| 38.753445|   0|null|      7|      456|     456|
|Background|      tcp|       3095|    <?>|          80|  RA_PA|662.994385|   0|   0|   6137|  5700818|  

**Mostrar Nomes das Colunas**

In [76]:
df.columns

['Rotulo',
 'protocolo',
 'PortaOrigem',
 'Direcao',
 'PortaDestino',
 'Estado',
 'Duracao',
 'sTos',
 'dTos',
 'TotPkts',
 'TotBytes',
 'SrcBytes']

In [77]:
df.show()
               

+----------+---------+-----------+-------+------------+-------+----------+----+----+-------+---------+--------+
|    Rotulo|protocolo|PortaOrigem|Direcao|PortaDestino| Estado|   Duracao|sTos|dTos|TotPkts| TotBytes|SrcBytes|
+----------+---------+-----------+-------+------------+-------+----------+----+----+-------+---------+--------+
|Background|      tcp|       1078|    <?>|          80|RPA_FPA| 83.062141|   0|   0|  43065| 40974671| 1033777|
|Background|      tcp|      13121|    <?>|       62860| RPA_PA|497.720459|   0|   0| 326962|132430976| 7076046|
|Background|      tcp|         80|    <?>|        3088|   PA_A|971.288147|   0|   0|   7912|  7356876| 7153650|
|Background|      tcp|      54518|    <?>|         993|  PA_PA|899.996399|   0|   0|     48|     5728|    3008|
|Background|      tcp|      19083|     ?>|        2185|   RPA_| 38.753445|   0|null|      7|      456|     456|
|Background|      tcp|       3095|    <?>|          80|  RA_PA|662.994385|   0|   0|   6137|  5700818|  

**Selecionar Colunas, atribuir ao novo datafreme**

In [78]:
df = df.select(col('Duracao'),col('protocolo'),col('PortaOrigem'),col('Direcao'),col('PortaDestino'),col('Estado')\
         ,col('sTos'),col('dTos'),col('TotPkts'),col('TotBytes'),col('SrcBytes'),col('Rotulo'))

df.show(5)

+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----------+
|   Duracao|protocolo|PortaOrigem|Direcao|PortaDestino| Estado|sTos|dTos|TotPkts| TotBytes|SrcBytes|    Rotulo|
+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----------+
| 83.062141|      tcp|       1078|    <?>|          80|RPA_FPA|   0|   0|  43065| 40974671| 1033777|Background|
|497.720459|      tcp|      13121|    <?>|       62860| RPA_PA|   0|   0| 326962|132430976| 7076046|Background|
|971.288147|      tcp|         80|    <?>|        3088|   PA_A|   0|   0|   7912|  7356876| 7153650|Background|
|899.996399|      tcp|      54518|    <?>|         993|  PA_PA|   0|   0|     48|     5728|    3008|Background|
| 38.753445|      tcp|      19083|     ?>|        2185|   RPA_|   0|null|      7|      456|     456|Background|
+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----

**Alterar o tipo de dados**

In [79]:
df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: string (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: integer (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



In [80]:
df = df.withColumn('PortaOrigem',col('Portaorigem').cast(IntegerType()))\
        .withColumn('PortaDestino',col('PortaDestino').cast(IntegerType()))

df.printSchema()

root
 |-- Duracao: double (nullable = true)
 |-- protocolo: string (nullable = true)
 |-- PortaOrigem: integer (nullable = true)
 |-- Direcao: string (nullable = true)
 |-- PortaDestino: integer (nullable = true)
 |-- Estado: string (nullable = true)
 |-- sTos: integer (nullable = true)
 |-- dTos: integer (nullable = true)
 |-- TotPkts: integer (nullable = true)
 |-- TotBytes: integer (nullable = true)
 |-- SrcBytes: integer (nullable = true)
 |-- Rotulo: string (nullable = true)



**Tratando Valores Nulos / dados faltante**

**Verificar Valores Nulos em cada coluna {Nome_coluna : Qtd de Valores nulos}**

In [81]:
for coluna in df.columns:
    print(coluna," : ", df.filter(df[coluna].isNull()).count())

Duracao  :  0
protocolo  :  0
PortaOrigem  :  12946
Direcao  :  0
PortaDestino  :  12990
Estado  :  91
sTos  :  980
dTos  :  16959
TotPkts  :  0
TotBytes  :  0
SrcBytes  :  0
Rotulo  :  0


**Preencher Valores nulos apartir do calculo da moda (a frequencia dos dados)**

In [82]:
# A impressao dos valores eh so para mostrar qual eh o valor mais frequente (a moda) em cda uma das colunas que posuem dados nulos
print("Nome da coluna : O valor com maior frequencia (Moda)")

for coluna in df.columns:
    if df.filter(df[coluna].isNull()).count() != 0:
        count_mode_val = df.groupBy(coluna).count().filter(col(coluna).isNotNull())\
                            .agg(max("count")).collect()[0][0]
        mode_val = df.groupBy(coluna).count().filter(col(coluna).isNotNull())\
                            .filter(col("count") == count_mode_val).select(coluna).collect()[0][0]

        print(coluna," : ", mode_val)
        df=df.na.fill(mode_val,subset=[coluna])

Nome da coluna : O valor com maior frequencia (Moda)
PortaOrigem  :  13363
PortaDestino  :  53
Estado  :  CON
sTos  :  0
dTos  :  0


**Verificando Novamente Valores nulos**

In [83]:
for coluna in df.columns:
    print(coluna," : ", df.filter(df[coluna].isNull()).count())

Duracao  :  0
protocolo  :  0
PortaOrigem  :  0
Direcao  :  0
PortaDestino  :  0
Estado  :  0
sTos  :  0
dTos  :  0
TotPkts  :  0
TotBytes  :  0
SrcBytes  :  0
Rotulo  :  0


**Mostrar o dataset**

In [84]:
df.show(5)

+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----------+
|   Duracao|protocolo|PortaOrigem|Direcao|PortaDestino| Estado|sTos|dTos|TotPkts| TotBytes|SrcBytes|    Rotulo|
+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----------+
| 83.062141|      tcp|       1078|    <?>|          80|RPA_FPA|   0|   0|  43065| 40974671| 1033777|Background|
|497.720459|      tcp|      13121|    <?>|       62860| RPA_PA|   0|   0| 326962|132430976| 7076046|Background|
|971.288147|      tcp|         80|    <?>|        3088|   PA_A|   0|   0|   7912|  7356876| 7153650|Background|
|899.996399|      tcp|      54518|    <?>|         993|  PA_PA|   0|   0|     48|     5728|    3008|Background|
| 38.753445|      tcp|      19083|     ?>|        2185|   RPA_|   0|   0|      7|      456|     456|Background|
+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----

**Tratando dados ruidosos** 

**Modando nome da coluna porta origem e destino para que as consultas da tempView funcionem
    (o spark.sql nao reconhece nomes de colunas unidas por " - ", ele considera como se fosse dois nomes)**

In [85]:
df.show(5)

+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----------+
|   Duracao|protocolo|PortaOrigem|Direcao|PortaDestino| Estado|sTos|dTos|TotPkts| TotBytes|SrcBytes|    Rotulo|
+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----------+
| 83.062141|      tcp|       1078|    <?>|          80|RPA_FPA|   0|   0|  43065| 40974671| 1033777|Background|
|497.720459|      tcp|      13121|    <?>|       62860| RPA_PA|   0|   0| 326962|132430976| 7076046|Background|
|971.288147|      tcp|         80|    <?>|        3088|   PA_A|   0|   0|   7912|  7356876| 7153650|Background|
|899.996399|      tcp|      54518|    <?>|         993|  PA_PA|   0|   0|     48|     5728|    3008|Background|
| 38.753445|      tcp|      19083|     ?>|        2185|   RPA_|   0|   0|      7|      456|     456|Background|
+----------+---------+-----------+-------+------------+-------+----+----+-------+---------+--------+----

In [86]:
df.createOrReplaceTempView("tabela")
spark.sql("SELECT count(*) from tabela").show()

+--------+
|count(1)|
+--------+
|  107251|
+--------+



In [87]:
spark.sql("SELECT DISTINCT Duracao from tabela").show()

+----------+
|   Duracao|
+----------+
|252.235641|
| 965.60437|
|948.458923|
|   0.38899|
|950.313721|
|932.998535|
| 29.553505|
|    7.5E-4|
|   3.71E-4|
| 33.070011|
| 21.092537|
|   7.18E-4|
|  0.154451|
|  1.244861|
|   8.77E-4|
|  0.999027|
|  0.218348|
|  0.187115|
|862.619385|
|  0.028589|
+----------+
only showing top 20 rows



In [88]:
spark.sql("SELECT DISTINCT PortaOrigem from tabela").count()

40804

In [89]:
spark.sql("SELECT DISTINCT PortaOrigem from tabela").show()

+-----------+
|PortaOrigem|
+-----------+
|      65408|
|      15790|
|      43688|
|       4818|
|      43527|
|      52697|
|       4935|
|      49331|
|      40386|
|      51415|
|       2122|
|      30970|
|      57693|
|      17420|
|      53634|
|      49308|
|       1088|
|      50353|
|      12799|
|      56110|
+-----------+
only showing top 20 rows



In [90]:
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|who    |
|?>     |
|<->    |
|<?>    |
|<-     |
|->     |
+-------+



**O Uso da funcao do replace do spark foi necessaria para tratar dados ruidosos na coluna Direcao**
Tratandosse de dados bidirecional, a coluna deveria ter direcao para duplo sentido, sentido esquedo e direito

In [91]:
# Funcao replace do sql

df=spark.sql("SELECT Duracao, protocolo, PortaOrigem, REPLACE(Direcao, '?', '-') as Direcao, PortaDestino, Estado\
            ,sTos, dTos, TotPkts, TotBytes, SrcBytes, Rotulo  from tabela")

In [92]:
df.createOrReplaceTempView("tabela")
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|who    |
|<->    |
|<-     |
|->     |
+-------+



In [93]:
df=df.withColumn('Direcao', regexp_replace('Direcao','who','<->'))
df.createOrReplaceTempView("tabela")
spark.sql("SELECT DISTINCT Direcao from tabela").show(truncate=False)

+-------+
|Direcao|
+-------+
|<->    |
|<-     |
|->     |
+-------+



**Elimanar linhas repetidas - a funcao distintct() e a dropDuplicates() (quando nao passado nenhum parametro funcianam da mesma maneira, eliminanam linhas repetidas e retornam um novo datafreme)**

In [94]:
#Total de linhas antes da eliminacao de linhas duplicadas 
df.count()

107251

In [95]:
# Contar linhas duplicadas 

df.groupBy(df.columns).count().where(col('count') > 1).select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|     10675|
+----------+



In [96]:
# Total de linhas depois de eliminacao de duplicatas
df.dropDuplicates().count()

97178

In [97]:
df=df.dropDuplicates()

In [98]:
df.count()

97178