In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import col, lower

In [None]:
spark = SparkSession.builder.master('yarn') \
    .config("spark.driver.memory","2g")\
    .config("spark.driver.cores","2")\
    .config("spark.executor.memory", "2g")\
    .config("spark.executor.cores", "2")\
    .config("spark.cores.max", "2")\
    .config("spark.jars.packages", "org.postgresql:postgresql:42.3.1")\
    .config('spark.sql.caseSensitive',True) \
    .appName('dataTreatment') \
    .getOrCreate()
spark

----

In [4]:
def toPostgres(df, table:str):
    df.write \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://{url}:{port}/{db}") \
        .option("dbtable", table) \
        .option("driver", "org.postgresql.Driver") \
        .option("user", user) \
        .option("password", password) \
        .mode("append") \
        .save()

In [5]:
labels = [
    ('ano', IntegerType()),
    ('trim', IntegerType()),
    ('mes', IntegerType()),
    ('dataatendimento', StringType()),
    ('cod_regiao', IntegerType()),
    ('regiao', StringType()),
    ('uf', StringType()),
    ('cod_tipoatendimento', IntegerType()),
    ('descricaotipoatendimento', StringType()),
    ('cod_assunto', IntegerType()),
    ('descricaoassunto', StringType()),
    ('grupoassunto', StringType()),
    ('cod_problema', IntegerType()),
    ('descricaoproblema', StringType()),
    ('grupoproblema', StringType()),
    ('sexo', StringType()),
    ('faixaetariaconsumidor', StringType()),
    ('cepconsumidor', IntegerType())
]
schema = StructType([StructField (x[0], x[1], True) for x in labels])

In [None]:
df = spark.read \
    .option("recursiveFileLookup",True) \
    .option("header",True) \
    .option("sep",",") \
    .csv("/user/hadoop/datalake/reclamacoes/", schema=schema) \
    #.persist(StorageLevel.MEMORY_ONLY)
print("Número de linhas no nosso DataFrame -> {} .".format(df.count()))
print(f"Schema do nosso {type(df)} :",end='\n \n')
df.printSchema()


### - Imprimimos uma amostra do nosso df

In [None]:
df.sample(0.5).show(3, truncate=False)

### - Perceba que dependendo da quantidade de colunas há a quebra da tabela para a próxima linha, uma soluçao seria ->

In [None]:
df.sample(0.5).show(1, truncate=False, vertical=True)

### - Uma alternativa de exibição é possível no Jupyter usando Spark>=2.4.0 :

In [None]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # Ativamos a FLAG
spark.conf.set("spark.sql.repl.eagerEval.truncate", '0') # Setamos o limite de caracteres antes de truncar '0' deixa ilimitado
#spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 3) 
df.limit(3)

In [None]:
# Listamos ano, trimestre e mês das reclamaçoes. Perceba as 'n' maneiras de referenciar uma coluna ou range de colunas(slice)
df.select('ano',df['trim'], df[2]).distinct().orderBy(df.columns[0:3], ascending=True).show(3, vertical=False)

In [26]:
# Como o área da manufatura é bastante ampla, focaremos na industria que tenha relação com 'carros/auto'. Observamos aqui os assuntos relacionados
df.filter((lower(col('descricaoassunto')).like("%carr%")) | (lower(col('descricaoassunto')).like("%auto%"))).select("descricaoassunto").distinct()

descricaoassunto
Carro Nacional Zero ( Montadora )
Carro Usado
"Consórcio de Automóveis ou Automotores, Utilitários, Caminhonetes"
Carro Importado
"Automóvel - Locação / Assistência Automobilística ( Clube do Automóvel, Etc. ) / Auto Escola"
"Oficinas ( Mecânica, Funilaria, Auto-Elétrica, Borracharia )"
Seguro de Automóvel
"Combustível Automotivo ( Gasolina, Álcool, Diesel, Gás )"


In [None]:
# Dentro das nossas milhares de reclamação vamos filtar apenas as reclamações relacionadas a carro/auto como nosso nicho
df = df.filter((lower(col('descricaoassunto')).like("%carr%")) | (lower(col('descricaoassunto')).like("%auto%")))
print("Número de linhas no nosso DataFrame -> {} .".format(df.count()))
df.printSchema()

In [None]:
# Exportamos nosso dado para o nosso DataWarehouse PG
toPostgres(df,'reclamacao')