## Limpeza de dados Cartola ano 2014

In [1]:
from pyspark.sql import HiveContext
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lower, col, lit, regexp_replace
import pandas as pd
import json
import requests

## Times

In [2]:
times_2014_df = spark.read.csv("/cartola/data/2014/2014_times.csv", header=True)
sorted_times_2014_df = times_2014_df.sort(times_2014_df.ID.asc())

#Chegar se o arquivo está no hdfs

times_2014_ano = times_2014_df.withColumn('ano', lit(2014))
times_2014_ano.show()


+---+-------------+----------+-------------+----+
| ID|         Nome|Abreviacao|         Slug| ano|
+---+-------------+----------+-------------+----+
|262|     flamengo|       FLA|     flamengo|2014|
|263|     botafogo|       BOT|     botafogo|2014|
|264|  corinthians|       COR|  corinthians|2014|
|265|        bahia|       BAH|        bahia|2014|
|266|   fluminense|       FLU|   fluminense|2014|
|275|    palmeiras|       PAL|    palmeiras|2014|
|276|    são paulo|       SAO|    sao-paulo|2014|
|277|       santos|       SAN|       santos|2014|
|282|  atlético-mg|       CAM|  atletico-mg|2014|
|283|     cruzeiro|       CRU|     cruzeiro|2014|
|284|       grêmio|       GRE|       gremio|2014|
|285|internacional|       INT|internacional|2014|
|287|      vitória|       VIT|      vitoria|2014|
|288|     criciúma|       CRI|     criciuma|2014|
|290|        goiás|       GOI|        goias|2014|
|292|        sport|       SPO|        sport|2014|
|293|  atlético-pr|       CAP|  atletico-pr|2014|


In [3]:
times_2014_ano.write.parquet('/cartola/clean/times/2014')
times_2014_parquet = spark.read.parquet('/cartola/clean/times/*')

In [4]:
times_2014_parquet.show()
times_2014_parquet.schema

#Alterar ID para int.(está string)

+---+-------------+----------+-------------+----+
| ID|         Nome|Abreviacao|         Slug| ano|
+---+-------------+----------+-------------+----+
|262|     flamengo|       FLA|     flamengo|2014|
|263|     botafogo|       BOT|     botafogo|2014|
|264|  corinthians|       COR|  corinthians|2014|
|265|        bahia|       BAH|        bahia|2014|
|266|   fluminense|       FLU|   fluminense|2014|
|275|    palmeiras|       PAL|    palmeiras|2014|
|276|    são paulo|       SAO|    sao-paulo|2014|
|277|       santos|       SAN|       santos|2014|
|282|  atlético-mg|       CAM|  atletico-mg|2014|
|283|     cruzeiro|       CRU|     cruzeiro|2014|
|284|       grêmio|       GRE|       gremio|2014|
|285|internacional|       INT|internacional|2014|
|287|      vitória|       VIT|      vitoria|2014|
|288|     criciúma|       CRI|     criciuma|2014|
|290|        goiás|       GOI|        goias|2014|
|292|        sport|       SPO|        sport|2014|
|293|  atlético-pr|       CAP|  atletico-pr|2014|


StructType(List(StructField(ID,StringType,true),StructField(Nome,StringType,true),StructField(Abreviacao,StringType,true),StructField(Slug,StringType,true),StructField(ano,IntegerType,true)))

In [5]:
times_2014_parquet = times_2014_parquet.withColumn("ID", times_2014_parquet["ID"].cast(IntegerType()))

In [6]:
times_2014_parquet.schema

StructType(List(StructField(ID,IntegerType,true),StructField(Nome,StringType,true),StructField(Abreviacao,StringType,true),StructField(Slug,StringType,true),StructField(ano,IntegerType,true)))

In [7]:
times_2014_parquet.show()

print(times_2014_parquet.count())

+---+-------------+----------+-------------+----+
| ID|         Nome|Abreviacao|         Slug| ano|
+---+-------------+----------+-------------+----+
|262|     flamengo|       FLA|     flamengo|2014|
|263|     botafogo|       BOT|     botafogo|2014|
|264|  corinthians|       COR|  corinthians|2014|
|265|        bahia|       BAH|        bahia|2014|
|266|   fluminense|       FLU|   fluminense|2014|
|275|    palmeiras|       PAL|    palmeiras|2014|
|276|    são paulo|       SAO|    sao-paulo|2014|
|277|       santos|       SAN|       santos|2014|
|282|  atlético-mg|       CAM|  atletico-mg|2014|
|283|     cruzeiro|       CRU|     cruzeiro|2014|
|284|       grêmio|       GRE|       gremio|2014|
|285|internacional|       INT|internacional|2014|
|287|      vitória|       VIT|      vitoria|2014|
|288|     criciúma|       CRI|     criciuma|2014|
|290|        goiás|       GOI|        goias|2014|
|292|        sport|       SPO|        sport|2014|
|293|  atlético-pr|       CAP|  atletico-pr|2014|


## Jogadores

In [8]:
#Carrega arquivo CSV
jogadores_2014_df = spark.read.csv("/cartola/data/2014/2014_jogadores.csv", header=True)
sorted_jogadores_2014_df = jogadores_2014_df.sort(jogadores_2014_df.ID.asc())


In [9]:
#Adiciona coluna ANO = 2014
jogadores_2014_ano = sorted_jogadores_2014_df.withColumn('ano', lit(2014))
jogadores_2014_ano.show()

+-----+-------------------+-------+---------+----+
|   ID|            Apelido|ClubeID|PosicaoID| ano|
+-----+-------------------+-------+---------+----+
|36443|               Dida|    285|        1|2014|
|36522|               Alex|    294|        4|2014|
|36540|               Juan|    285|        3|2014|
|36612|         Zé Roberto|    284|        4|2014|
|36728|              Índio|    285|        3|2014|
|36775|    Marcos Assunção|    316|        4|2014|
|36790|              Josué|    282|        4|2014|
|36792|             Araújo|    290|        5|2014|
|36802|  Ronaldinho Gaúcho|    282|        4|2014|
|36856|         Celso Roth|    294|        6|2014|
|36934|     Muricy Ramalho|    276|        6|2014|
|36940|Oswaldo De Oliveira|    277|        6|2014|
|36943|      Paulo Autuori|    282|        6|2014|
|37245|      Guto Ferreira|    316|        6|2014|
|37246|         Ney Franco|    287|        6|2014|
|37281|       Mano Menezes|    264|        6|2014|
|37306|      Gilson Kleina|    

In [10]:
jogadores_2014_ano.write.parquet('/cartola/clean/jogadores/2014')

In [11]:
#transforma arquivo em parquet
jogadores_2014_parquet = spark.read.parquet('/cartola/clean/jogadores/*')

In [12]:
jogadores_2014_parquet.show()
jogadores_2014_parquet.schema


+-----+------------------+-------+---------+----+
|   ID|           Apelido|ClubeID|PosicaoID| ano|
+-----+------------------+-------+---------+----+
|51683|      Bruno Rangel|    315|        5|2014|
|51705|     Bruno Rodrigo|    283|        3|2014|
|51772|   Éverton Ribeiro|    283|        4|2014|
|51779|     Pedro Botelho|    282|        2|2014|
|51781|             Ávine|    265|        2|2014|
|51985|    Anderson Pedra|    292|        4|2014|
|70058|        Tiago Real|    290|        4|2014|
|70106|            Mouche|    275|        5|2014|
|70111|  Guilherme Mattis|    266|        3|2014|
|70116|    Fellipe Bastos|    284|        4|2014|
|70132|       Alan Santos|    277|        4|2014|
|70133|    Paulo Henrique|    275|        2|2014|
|71575|   Danilo Tarracha|    287|        2|2014|
|71581|         Léo Costa|    287|        4|2014|
|71593|   Henrique Mattos|    292|        3|2014|
|71602|        João Paulo|    262|        2|2014|
|71618|Anderson Conceição|    265|        3|2014|


StructType(List(StructField(ID,StringType,true),StructField(Apelido,StringType,true),StructField(ClubeID,StringType,true),StructField(PosicaoID,StringType,true),StructField(ano,IntegerType,true)))

In [13]:
#Alterar ID para int.(está string)
jogadores_2014_parquet = jogadores_2014_parquet.withColumn("ID", jogadores_2014_parquet["ID"].cast(IntegerType()))
jogadores_2014_parquet = jogadores_2014_parquet.withColumn("ClubeID", jogadores_2014_parquet["ClubeID"].cast(IntegerType()))
jogadores_2014_parquet = jogadores_2014_parquet.withColumn("PosicaoID", jogadores_2014_parquet["PosicaoID"].cast(IntegerType()))

In [14]:
jogadores_2014_parquet.schema

StructType(List(StructField(ID,IntegerType,true),StructField(Apelido,StringType,true),StructField(ClubeID,IntegerType,true),StructField(PosicaoID,IntegerType,true),StructField(ano,IntegerType,true)))

In [15]:
jogadores_2014_parquet.show()

print(jogadores_2014_parquet.count())

+-----+------------------+-------+---------+----+
|   ID|           Apelido|ClubeID|PosicaoID| ano|
+-----+------------------+-------+---------+----+
|51683|      Bruno Rangel|    315|        5|2014|
|51705|     Bruno Rodrigo|    283|        3|2014|
|51772|   Éverton Ribeiro|    283|        4|2014|
|51779|     Pedro Botelho|    282|        2|2014|
|51781|             Ávine|    265|        2|2014|
|51985|    Anderson Pedra|    292|        4|2014|
|70058|        Tiago Real|    290|        4|2014|
|70106|            Mouche|    275|        5|2014|
|70111|  Guilherme Mattis|    266|        3|2014|
|70116|    Fellipe Bastos|    284|        4|2014|
|70132|       Alan Santos|    277|        4|2014|
|70133|    Paulo Henrique|    275|        2|2014|
|71575|   Danilo Tarracha|    287|        2|2014|
|71581|         Léo Costa|    287|        4|2014|
|71593|   Henrique Mattos|    292|        3|2014|
|71602|        João Paulo|    262|        2|2014|
|71618|Anderson Conceição|    265|        3|2014|


## Partidas_IDS

In [None]:
partidas_ids_2014_df = spark.read.csv("/cartola/data/2014/2014_partidas_ids.csv", header=True)

In [None]:
partidas_ids_2014_ano = partidas_ids_2014_df.withColumn('ano', lit(2014))

In [None]:
partidas_ids_2014_ano.show(1000)

In [None]:
partidas_ids_2014_ano.write.partitionBy('ano').parquet('/cartola/clean/partidas_ids')

In [None]:
partidas_ids_2014_parquet = spark.read.parquet('/cartola/clean/partidas_ids')

In [None]:
partidas_ids_2014_parquet.show(10000)
partidas_ids_2014_parquet.schema

In [None]:
partidas_ids_2014_parquet = partidas_ids_2014_parquet.withColumn("ID", partidas_ids_2014_parquet["ID"].cast(IntegerType()))
partidas_ids_2014_parquet = partidas_ids_2014_parquet.withColumn("Rodada", partidas_ids_2014_parquet["Rodada"].cast(IntegerType()))
partidas_ids_2014_parquet = partidas_ids_2014_parquet.withColumn("Casa", partidas_ids_2014_parquet["Casa"].cast(IntegerType()))
partidas_ids_2014_parquet = partidas_ids_2014_parquet.withColumn("Visitante", partidas_ids_2014_parquet["Visitante"].cast(IntegerType()))
partidas_ids_2014_parquet = partidas_ids_2014_parquet.withColumn("PlacarCasa", partidas_ids_2014_parquet["PlacarVisitante"].cast(IntegerType()))
partidas_ids_2014_parquet = partidas_ids_2014_parquet.withColumn("PlacarVisitante", partidas_ids_2014_parquet["PlacarVisitante"].cast(IntegerType()))

In [None]:
partidas_ids_2014_parquet.show()

print(partidas_ids_2014_parquet.count())

## LANCES (Criar a tabela de scouts a partir da tabela lances)

In [None]:
lances_2014_df = spark.read.csv("/cartola/data/2014/2014_lances.csv", header=True)

In [None]:
lances_2014_ano = lances_2014_df.withColumn('ano', lit(2014))

In [None]:
lances_2014_ano.show(1000)

In [None]:
lances_2014_ano.write.partitionBy('ano').parquet('/cartola/clean/lances')

In [None]:
lances_2014_parquet = spark.read.parquet('/cartola/clean/lances')

In [None]:
lances_2014_parquet.show(10000)
lances_2014_parquet.schema

In [None]:
lances_2014_parquet = lances_2014_parquet.withColumn("ID", lances_2014_parquet["ID"].cast(IntegerType()))
lances_2014_parquet = lances_2014_parquet.withColumn("PartidaID", lances_2014_parquet["PartidaID"].cast(IntegerType()))
lances_2014_parquet = lances_2014_parquet.withColumn("ClubeID", lances_2014_parquet["ClubeID"].cast(IntegerType()))
lances_2014_parquet = lances_2014_parquet.withColumn("AtletaID", lances_2014_parquet["AtletaID"].cast(IntegerType()))
lances_2014_parquet = lances_2014_parquet.withColumn("Momento", lances_2014_parquet["Momento"].cast(IntegerType()))

In [None]:
lances_2014_parquet.show()

print(lances_2014_parquet.count())

## scouts_raw

In [11]:
scouts_raw_2014_df = spark.read.csv("/cartola/data/2014/2014_scouts_raw.csv", header=True)

In [12]:
scouts_raw_2014_ano = scouts_raw_2014_df.withColumn('ano', lit(2014))

In [13]:
scouts_raw_2014_ano.show(1000)

+------+------+-----+----------+-------+-----+------+-----------+-----+-------------+-------+-----+-------+-----------+-------------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
|Atleta|Rodada|Clube|Participou|Posicao|Jogos|Pontos|PontosMedia|Preco|PrecoVariacao|Partida|Mando|Titular|Substituido|        TempoJogado|Nota| FS| PE|  A| FT| FD| FF|  G|  I| PP| RB| FC| GC| CA| CV| SG| DD| DP| GS| ano|
+------+------+-----+----------+-------+-----+------+-----------+-----+-------------+-------+-----+-------+-----------+-------------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
| 36443|     0| null|         0|   null|    0|     0|          0|    9|            0|   null|    0|   null|          0|               null|null|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|2014|
| 36443|     1|  285|         1|      1|    1|     5|          5| 10.6|          1.6| 179879|    1|      1|     

In [14]:
# remove todas as linhas com rodada == 0
scouts_raw_2014_ano_clean = scouts_raw_2014_ano[scouts_raw_2014_ano['Rodada'] > 0]

In [15]:
# remove todos os jogadores que não participaram de alguma rodada
scouts_raw_2014_ano_clean = scouts_raw_2014_ano_clean[(scouts_raw_2014_ano_clean['Participou'] == True) | (scouts_raw_2014_ano_clean['PrecoVariacao'] != 0)]

In [16]:
scouts_raw_2014_ano_clean.show(10000)

+------+------+-----+----------+-------+-----+------+-----------+-----+-------------+-------+-----+-------+-----------+-------------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
|Atleta|Rodada|Clube|Participou|Posicao|Jogos|Pontos|PontosMedia|Preco|PrecoVariacao|Partida|Mando|Titular|Substituido|        TempoJogado|Nota| FS| PE|  A| FT| FD| FF|  G|  I| PP| RB| FC| GC| CA| CV| SG| DD| DP| GS| ano|
+------+------+-----+----------+-------+-----+------+-----------+-----+-------------+-------+-----+-------+-----------+-------------------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+
| 36443|     1|  285|         1|      1|    1|     5|          5| 10.6|          1.6| 179879|    1|      1|          0|                  1|   6|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|2014|
| 36443|     2|  285|         1|      1|    2|    -3|          1| 8.27|        -2.33| 179882|    0|      1|     

In [17]:
scouts_raw_2014_ano_clean.write.parquet('/cartola/clean/scouts_raw/2014')

In [18]:
scouts_raw_2014_parquet = spark.read.parquet('/cartola/clean/scouts_raw/*')

In [23]:
#scouts_raw_2014_parquet.show(10000)
scouts_raw_2014_parquet.limit(10).toPandas()

Unnamed: 0,Atleta,Rodada,Clube,Participou,Posicao,Jogos,Pontos,PontosMedia,Preco,PrecoVariacao,...,RB,FC,GC,CA,CV,SG,DD,DP,GS,ano
0,36443,1,285,1,1,1,5.0,5.0,10.6,1.6,...,0,0,0,0,0,1,0,0,0,2014
1,36443,2,285,1,1,2,-3.0,1.0,8.27,-2.33,...,0,0,0,0,0,0,0,0,2,2014
2,36443,3,285,1,1,3,-2.6,-0.2,6.81,-1.46,...,0,0,0,0,0,0,0,0,1,2014
3,36443,4,285,1,1,4,4.0,0.85,7.96,1.15,...,0,0,0,0,0,0,2,0,1,2014
4,36443,5,285,1,1,5,5.0,1.68,8.61,0.65,...,0,0,0,0,0,1,0,0,0,2014
5,36443,6,285,1,1,6,1.2,1.6,8.02,-0.59,...,0,1,0,1,0,0,2,0,1,2014
6,36443,7,285,1,1,7,-3.3,0.9,6.75,-1.27,...,0,0,0,0,0,0,1,0,3,2014
7,36443,8,285,1,1,8,3.0,1.16,7.55,0.8,...,0,0,0,1,0,1,0,0,0,2014
8,36443,9,285,1,1,9,-1.5,0.86,6.71,-0.84,...,0,0,0,0,0,0,0,0,1,2014
9,36443,10,285,1,1,10,-4.0,0.37,6.01,-0.7,...,0,0,0,0,0,0,0,0,2,2014


In [24]:
scouts_raw_2014_parquet.schema

StructType(List(StructField(Atleta,StringType,true),StructField(Rodada,StringType,true),StructField(Clube,StringType,true),StructField(Participou,StringType,true),StructField(Posicao,StringType,true),StructField(Jogos,StringType,true),StructField(Pontos,StringType,true),StructField(PontosMedia,StringType,true),StructField(Preco,StringType,true),StructField(PrecoVariacao,StringType,true),StructField(Partida,StringType,true),StructField(Mando,StringType,true),StructField(Titular,StringType,true),StructField(Substituido,StringType,true),StructField(TempoJogado,StringType,true),StructField(Nota,StringType,true),StructField(FS,StringType,true),StructField(PE,StringType,true),StructField(A,StringType,true),StructField(FT,StringType,true),StructField(FD,StringType,true),StructField(FF,StringType,true),StructField(G,StringType,true),StructField(I,StringType,true),StructField(PP,StringType,true),StructField(RB,StringType,true),StructField(FC,StringType,true),StructField(GC,StringType,true),Stru

In [25]:
#deve-se converter valores em BigDecimal?
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Atleta", scouts_raw_2014_parquet["Atleta"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Rodada", scouts_raw_2014_parquet["Rodada"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Clube", scouts_raw_2014_parquet["Clube"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Participou", scouts_raw_2014_parquet["Participou"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Posicao", scouts_raw_2014_parquet["Posicao"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Jogos", scouts_raw_2014_parquet["Jogos"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Partida", scouts_raw_2014_parquet["Partida"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Mando", scouts_raw_2014_parquet["Mando"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Titular", scouts_raw_2014_parquet["Titular"].cast(IntegerType()))
scouts_raw_2014_parquet = scouts_raw_2014_parquet.withColumn("Participou", scouts_raw_2014_parquet["Participou"].cast(IntegerType()))

In [26]:
#scouts_raw_2014_parquet.show()
scouts_raw_2014_parquet.limit(10).toPandas()
print(scouts_raw_2014_parquet.count())

11171


## DEMONSTRAÇÃO

In [None]:
partidas_2014_df = spark.read.csv("/cartola/data/2014/2014_jogadores.csv", header=True)
partidas_2014_ct = partidas_2014_df.withColumn('time', regexp_replace('home_team', ' - RJ', ''))
final_partidas = partidas_2014_ct.withColumn('time_low', lower(col('time'))).show(truncate=False)

In [None]:
with_ano_partidas = partidas_2014_ct.withColumn('ano', lit(2014)).show(truncate=False)
with_ano_partidas.show()

In [None]:
Comando para sobrescrever arquivo caso já existente.
Agrupar scouts por ID e contar (caso tenha duplicidade)