In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
import pandas as pd

### Leitura de dados

In [2]:
read_path = '/home/tatiane/Downloads/exemplo_linkage_cases-brazil-cities-time.csv/tmp/exemplo_linkage_cases-brazil-cities-time.csv'

In [3]:
df = spark.read.csv(read_path, sep = ",", multiLine = True)

In [4]:
#renomeando colunas
header = ["id1", "id2", "city1", "city2", "regiao_saude1", "regiao_saude2", "codmun1", "codmun2", "date1", "date2", "epi_week1", "epi_week2", "score"]

for nome_antigo, novo_nome in zip(df.columns, header):
    df = df.withColumnRenamed(nome_antigo, novo_nome)

In [5]:
df.printSchema()

root
 |-- id1: string (nullable = true)
 |-- id2: string (nullable = true)
 |-- city1: string (nullable = true)
 |-- city2: string (nullable = true)
 |-- regiao_saude1: string (nullable = true)
 |-- regiao_saude2: string (nullable = true)
 |-- codmun1: string (nullable = true)
 |-- codmun2: string (nullable = true)
 |-- date1: string (nullable = true)
 |-- date2: string (nullable = true)
 |-- epi_week1: string (nullable = true)
 |-- epi_week2: string (nullable = true)
 |-- score: string (nullable = true)



### Filtrando dados

In [6]:
df.limit(5).toPandas()

Unnamed: 0,id1,id2,city1,city2,regiao_saude1,regiao_saude2,codmun1,codmun2,date1,date2,epi_week1,epi_week2,score
0,0,0,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200225,20200225,9,9,1.0
1,2,2,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200226,20200226,9,9,1.0
2,4,4,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200227,20200227,9,9,1.0
3,6,6,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200228,20200228,9,9,1.0
4,1,1,TOTAL,TOTAL,,,0,0,20200225,20200225,9,9,0.7237569060773481


In [7]:
#df = df.sample(0.25).cache()

In [7]:
df = df.limit(100).cache()

In [8]:
# Converter o tipo correto
df = df.withColumn('score', F.col('score').cast(DoubleType()))

In [9]:
#conferindo se mudou o tipo
df.printSchema()

root
 |-- id1: string (nullable = true)
 |-- id2: string (nullable = true)
 |-- city1: string (nullable = true)
 |-- city2: string (nullable = true)
 |-- regiao_saude1: string (nullable = true)
 |-- regiao_saude2: string (nullable = true)
 |-- codmun1: string (nullable = true)
 |-- codmun2: string (nullable = true)
 |-- date1: string (nullable = true)
 |-- date2: string (nullable = true)
 |-- epi_week1: string (nullable = true)
 |-- epi_week2: string (nullable = true)
 |-- score: double (nullable = true)



In [10]:
# Descrição dos dados
df.select('score').summary().toPandas().set_index('summary') #.T #transforma coluna em linhas para facilitar a visão da tabela.

Unnamed: 0_level_0,score
summary,Unnamed: 1_level_1
count,100.0
mean,0.9088397790055244
stddev,0.1305473929904035
min,0.7237569060773481
25%,0.7237569060773481
50%,1.0
75%,1.0
max,1.0


In [11]:
# Distribuição dos dados
df.groupBy(F.round('score', 1)).count().show()

+---------------+-----+
|round(score, 1)|count|
+---------------+-----+
|            1.0|   67|
|            0.7|   33|
+---------------+-----+



### Extraindo dados de colunas
* Dia, mes, ano
* Extraindo o estado do municipio

In [12]:
df.limit(10).toPandas()

Unnamed: 0,id1,id2,city1,city2,regiao_saude1,regiao_saude2,codmun1,codmun2,date1,date2,epi_week1,epi_week2,score
0,0,0,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200225,20200225,9,9,1.0
1,2,2,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200226,20200226,9,9,1.0
2,4,4,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200227,20200227,9,9,1.0
3,6,6,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200228,20200228,9,9,1.0
4,1,1,TOTAL,TOTAL,,,0,0,20200225,20200225,9,9,0.723757
5,5,5,TOTAL,TOTAL,,,0,0,20200227,20200227,9,9,0.723757
6,3,3,TOTAL,TOTAL,,,0,0,20200226,20200226,9,9,0.723757
7,8,8,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200229,20200229,9,9,1.0
8,7,7,TOTAL,TOTAL,,,0,0,20200228,20200228,9,9,0.723757
9,10,10,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200301,20200301,10,10,1.0


In [13]:
#criando colunas dia, mês e ano, extraindo-as da coluna date1, começando em tal numero, pegando x casas.
df = df.withColumn('dia1', F.substring('date1', 7, 2))
df = df.withColumn('mes1', F.substring('date1', 5, 2))
df = df.withColumn('ano1', F.substring('date1', 1, 4))

In [14]:
df.select('date1', 'ano1', 'mes1', 'dia1').limit(5).toPandas()

Unnamed: 0,date1,ano1,mes1,dia1
0,20200225,2020,2,25
1,20200226,2020,2,26
2,20200227,2020,2,27
3,20200228,2020,2,28
4,20200225,2020,2,25


In [15]:
df = df.withColumn('dia2', F.substring('date2', 7, 2))
df = df.withColumn('mes2', F.substring('date2', 5, 2))
df = df.withColumn('ano2', F.substring('date2', 1, 4))

In [16]:
df.limit(5).toPandas()

Unnamed: 0,id1,id2,city1,city2,regiao_saude1,regiao_saude2,codmun1,codmun2,date1,date2,epi_week1,epi_week2,score,dia1,mes1,ano1,dia2,mes2,ano2
0,0,0,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200225,20200225,9,9,1.0,25,2,2020,25,2,2020
1,2,2,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200226,20200226,9,9,1.0,26,2,2020,26,2,2020
2,4,4,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200227,20200227,9,9,1.0,27,2,2020,27,2,2020
3,6,6,São Paulo/SP,São Paulo/SP,São Paulo,São Paulo,355030,355030,20200228,20200228,9,9,1.0,28,2,2020,28,2,2020
4,1,1,TOTAL,TOTAL,,,0,0,20200225,20200225,9,9,0.723757,25,2,2020,25,2,2020


In [17]:
# Extrair o estado do cod do municipio
df = df.withColumn('Estado1', F.substring('codmun1', 1, 2))
df = df.withColumn('Estado2', F.substring('codmun2', 1, 2))

In [18]:
df.select('codmun1', 'Estado1').limit(5).toPandas()

Unnamed: 0,codmun1,Estado1
0,355030,35
1,355030,35
2,355030,35
3,355030,35
4,0,0


### Operações entre colunas 
* +, -, *, /

In [19]:
# Soma de colunas
df.select('dia1', 'mes1', (F.col('dia1') + F.col('mes1')).alias('soma')).show()

+----+----+----+
|dia1|mes1|soma|
+----+----+----+
|  25|  02|27.0|
|  26|  02|28.0|
|  27|  02|29.0|
|  28|  02|30.0|
|  25|  02|27.0|
|  27|  02|29.0|
|  26|  02|28.0|
|  29|  02|31.0|
|  28|  02|30.0|
|  01|  03| 4.0|
|  02|  03| 5.0|
|  03|  03| 6.0|
|  29|  02|31.0|
|  02|  03| 5.0|
|  03|  03| 6.0|
|  01|  03| 4.0|
|  05|  03| 8.0|
|  04|  03| 7.0|
|  04|  03| 7.0|
|  05|  03| 8.0|
+----+----+----+
only showing top 20 rows



In [20]:
# Multiplicação e soma de colunas
df.select('dia1', 'mes1', (F.col('dia1') + (F.col('mes1') - F.lit(1)) * 30).alias('soma')).show()

+----+----+----+
|dia1|mes1|soma|
+----+----+----+
|  25|  02|55.0|
|  26|  02|56.0|
|  27|  02|57.0|
|  28|  02|58.0|
|  25|  02|55.0|
|  27|  02|57.0|
|  26|  02|56.0|
|  29|  02|59.0|
|  28|  02|58.0|
|  01|  03|61.0|
|  02|  03|62.0|
|  03|  03|63.0|
|  29|  02|59.0|
|  02|  03|62.0|
|  03|  03|63.0|
|  01|  03|61.0|
|  05|  03|65.0|
|  04|  03|64.0|
|  04|  03|64.0|
|  05|  03|65.0|
+----+----+----+
only showing top 20 rows



In [21]:
# Divisão
df.select('codmun1', (F.col('codmun1') / 10000).cast(IntegerType()).alias('xx')).show()
# Nesse código estamos selecionando a varivavel codmun, dividindo seu valor por 10 mil, transformando seu valor em inveito para tirar as sobras, dando nome para coluna e pedindo para exibir.

+-------+---+
|codmun1| xx|
+-------+---+
| 355030| 35|
| 355030| 35|
| 355030| 35|
| 355030| 35|
|      0|  0|
|      0|  0|
|      0|  0|
| 355030| 35|
|      0|  0|
| 355030| 35|
| 355030| 35|
| 355030| 35|
|      0|  0|
|      0|  0|
|      0|  0|
|      0|  0|
| 355030| 35|
| 355030| 35|
|      0|  0|
|      0|  0|
+-------+---+
only showing top 20 rows



### Comparando colunas
* Registros que têm TODAS as colunas de linkage iguais (menos os ids) e o score diferente de 1
* Registros que têm ALGUMA coluna do linkage diferente (menos os ids) e o score igual de 1
  * DICA: Usar `.filter` com as devidas condições

In [22]:
df = df.withColumn('city', F.when(F.col('city1') == F.col('city2'), 1).otherwise(0))
df = df.withColumn('regiao_saude', F.when(F.col('regiao_saude1') == F.col('regiao_saude2'), 1).otherwise(0))
df = df.withColumn('date', F.when(F.col('date1') == F.col('date2'), 1).otherwise(0))
df = df.withColumn('epi_week', F.when(F.col('epi_week1') == F.col('epi_week2'), 1).otherwise(0))
df = df.withColumn('codmun', F.when(F.col('codmun1') == F.col('codmun2'), 1).otherwise(0))

In [23]:
df.filter(df.city == 1).count()

100

In [28]:
# Registros que têm TODAS as colunas de linkage iguais (menos os ids) e o score diferente de 1
# Ao colocar .show() mostra o cabeçalho com algumas linhas em branco.
df.filter((F.col('city') == 1) & (F.col('regiao_saude') == 1) & (F.col('date') == 1) & (F.col('epi_week') == 1) & (F.col('codmun') == 1) & (F.col('score') != 1)).count()

0

In [35]:
# Registros que têm ALGUMA coluna do linkage diferente (menos os ids) e o score igual de 1
# para cidade com score diferente de 1
df.filter((F.col('score') == 1) & (F.col('city') != 1) & (F.col('regiao_saude') == 1) & (F.col('date') == 1) & (F.col('epi_week') == 1) & (F.col('codmun') == 1)).count()

# para regiao_saude com score diferente de 1
df.filter((F.col('score') == 1) & (F.col('city') == 1) & (F.col('regiao_saude') != 1) & (F.col('date') == 1) & (F.col('epi_week') == 1) & (F.col('codmun') == 1)).count()

# para date com score diferente de 1
df.filter((F.col('score') == 1) & (F.col('city') == 1) & (F.col('regiao_saude') == 1) & (F.col('date') != 1) & (F.col('epi_week') == 1) & (F.col('codmun') == 1)).count()

# para epi_week com score diferente de 1
df.filter((F.col('score') == 1) & (F.col('city') == 1) & (F.col('regiao_saude') == 1) & (F.col('date') == 1) & (F.col('epi_week') != 1) & (F.col('codmun') == 1)).count()

# para codmun com score diferente de 1
df.filter((F.col('score') == 1) & (F.col('city') == 1) & (F.col('regiao_saude') == 1) & (F.col('date') == 1) & (F.col('epi_week') == 1) & (F.col('codmun') != 1)).count()

0