In [None]:
!curl ifconfig.me

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce
import numpy as np
import re

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--jars gs://scriptsnatalsoul/postgresql-42.3.1.jar pyspark-shell'


In [None]:
spark = SparkSession.builder.appName('InsercaoPostgre')\
.config('spark.sql.caseSensitive','True')\
.getOrCreate()
spark

In [None]:
url = '34.134.31.72'
db = 'telecomunicacoes'
port = '5432'
user = 'postgres'
password = 'Ox95F1eyft7LPBeN'

In [None]:
def loadData(table:str):
        df = spark.read \
        .format("jdbc") \
        .option("numPartitions", "4") \
        .option("fetchsize", "50000") \
        .option("url", f"jdbc:postgresql://{url}:{port}/{db}") \
        .option("dbtable", table) \
        .option("driver", "org.postgresql.Driver") \
        .option("user", user) \
        .option("password", password) \
        .load()
        return df

## --> 1. Tratamento de **dados_ibge**

In [None]:
dados_ibge = loadData('dados_ibge')
dados_ibge.printSchema()

In [None]:
print('numero de entradas ->',dados_ibge.count())
dados_ibge.select([count(when(isnan(c),c)).alias(c)for c in dados_ibge.columns]).show()

In [None]:
dados_ibge.show(3)

In [None]:
DFdi = dados_ibge.select(col('ano'),
                        col('id_municipio').alias('id_ibge'),
                        regexp_replace('id_municipio', r"(^.{1,2})" , "").alias('id_municipio').cast(IntegerType()),
                        substring('id_municipio',0,2).alias('id_uf').cast(IntegerType()),
                        col('pib').alias('pib_municipio')).filter( (col('ano') == 2017) | (col('ano') == 2018) )
print('numero de entradas ->',DFdi.count())
DFdi.show(3)

In [None]:
DFdi.printSchema()

## --> 2. Tratamento de **cidades_ibge**

In [None]:
cidades_ibge = loadData('cidades_ibge')
cidades_ibge.printSchema()

In [None]:
print('numero de entradas ->',cidades_ibge.count())
cidades_ibge.select([count(when(isnan(c),c)).alias(c)for c in cidades_ibge.columns]).show()

In [None]:
cidades_ibge = cidades_ibge.dropna()
cidades_ibge.count()

In [None]:
cidades_ibge = cidades_ibge.withColumn("COD__UF", (col('COD__UF')*100000))
cidades_ibge = cidades_ibge.withColumn('id_ibge', (col('COD__UF')+col('COD__MUNIC')))
DFci = cidades_ibge
DFci.show(3)

## --> 3. Fusão das tabelas tratadas **cidades_ibge** e **dados_ibge**

In [None]:
print(' ',DFdi.columns,'\n ',DFci.columns)

In [None]:
DFjoin = DFci.join(DFdi, ['id_ibge'], 'inner').drop(*['COD__MUNIC', 'COD__UF'])
DFjoin.count()

In [None]:
DFjoin.show(3)

In [None]:
print('numero de entradas ->',DFjoin.count())
DFjoin.select([count(when(isnan(c),c)).alias(c)for c in DFjoin.columns]).show()

In [None]:
DFjoin.printSchema()

In [None]:
DFjoin.show(3)

In [None]:
DFjoin = DFjoin.select(DFjoin[0].alias('cod_ibge'),
                DFjoin[1].alias('uf'),
                DFjoin[2].alias('nome_municipio'),
                regexp_replace(DFjoin[3], "\." , "").alias('populacao').cast(IntegerType()),                       
                DFjoin[4],
                DFjoin[5].alias('cod_municipio'),
                DFjoin[6].alias('cod_uf'),
                DFjoin[7].alias('pib_municipio'))

In [None]:
DFjoin.show(3)

In [None]:
DFjoin.printSchema()

In [None]:
DFjoin.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Dados_ibge')

## --> 3. Tratamento de dados de qualidade

In [None]:
qualidade = loadData('qualidade')

In [None]:
qualidade.show(3)

In [None]:
qualidade.printSchema()

In [None]:
dfqld = qualidade.select(col('Servi_o').alias('servico'),
                         col('Empresa').alias('empresa'),
                         col('Ano').alias('ano'),
                         col('UF').alias('uf'),
                         col('Meta_do_Indicador').alias('meta_indicador'),
                         col('Cumpriu').alias('cumpriu'),
                         col('Descumpriu').alias('descumpriu'))
dfqld.show(3)

In [None]:
dfqld.printSchema()

In [None]:
dfqld.show(3)

In [None]:
dfqld.select([count(when(isnan(c),c)).alias(c)for c in dfqld.columns]).show()

In [None]:
dfqld.printSchema()

In [None]:
dfqld.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Qualidade')

## --> 4. Tratamento de dados de cobertura

In [None]:
cobertura_operadoras = loadData('cobertura_operadoras')

In [None]:
dfco = cobertura_operadoras.select(col('Ano').alias('ano'), 
                                    col('Operadora').alias('operadora'),
                                    col('Tecnologia').alias('tecnologia'),
                                    col('Tipo_Setor').alias('setor'),
                                    substring("C_digo_Munic_pio",0,2).alias('cod_uf').cast(IntegerType()),
                                    regexp_replace("C_digo_Munic_pio", r"(^.{1,2})" , "").alias('cod_municipio').cast(IntegerType()),
                                    col('UF').alias('uf'),
                                    col('Regi_o').alias('regiao'),
                                    col('Domic_lios').alias('domicilios'),
                                    col('Moradores').alias('moradores'),
                                    col('Percentual_Cobertura').alias('percentual_cobertura'))
dfco.show(3)

In [None]:
dfco.printSchema()

In [None]:
dfco.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Cobertura')

## --> 5. Tratamento de dados de reclamacoes

In [None]:
reclamacoes = loadData('reclamacoes')
reclamacoes.printSchema()

In [None]:
dfrecl = reclamacoes.select(col('Ano').alias('ano'),
                    col('M_s').alias('mes'),
                    col('UF').alias('uf'),
                    col('Cidade').alias('cidade'),
                    regexp_replace("CO_MUNICIPIO", r"(^.{1,2})" , "").alias('cod_municipio').cast(IntegerType()),
                    col('CanalEntrada').alias('canal'),
                    col('Marca').alias('marca'),
                    col('Assunto').alias('assunto')).filter((col('Ano')==2017) |
                          (col('Ano')==2018) |
                          (col('marca')=='TIM') |
                          (col('marca')=='OI'))


In [None]:
dfrecl.printSchema()

In [None]:
dfrecl.count()

In [None]:
dfrecl.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Reclamacao')