In [5]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import split, col 
from os.path import abspath


+---+--------------+-----------+--------------+---------+---------+
| ID|CHAVE_SITUACAO|CLASS_RISCO|   CAT_CLIENTE|PAGAMENTO|CATEGORIA|
+---+--------------+-----------+--------------+---------+---------+
|  1|          32FC|      cinza|   Basic-Alpha|        1|        C|
|  2|          25MV|    Amarelo|         Black|        1|        A|
|  3|          27MV|    Amarelo|    Basic-Beta|        1|        B|
|  4|          26FD|    Amarelo|         Black|        0|        B|
|  5|          26FD|    Amarelo|         Black|        0|        C|
|  6|          28FC|    Amarelo|Platinum-Alpha|        0|        C|
|  7|          27MD|      Verde| Platinum-Beta|        1|        A|
|  8|          31MD|      Cinza|         Basic|        0|        C|
|  9|          28FS|      Cinza|         Black|        1|        A|
| 10|          31MV|    Amarelo|      Platinum|        1|        C|
| 11|          29MV|    Amarelo|         Basic|        1|        A|
| 12|          30FC|      Cinza|    Basic-Beta| 

In [None]:
# Abrindo uma sessão Spark
spark = (SparkSession.builder
         .appName("Estudando PySpark")
         .enableHiveSupport() 
         .getOrCreate())


In [None]:
# Criando um schema para definir os tipos de preenchimento para o df
schema = StructType([ \
    StructField("ID", IntegerType(), True), \
    StructField("CHAVE_SITUACAO", StringType(), True), \
    StructField("CLASS_RISCO",StringType(),True), \
    StructField("CAT_CLIENTE",StringType(),True), \
    StructField("PAGAMENTO",IntegerType(),True),  \
    StructField("CATEGORIA",StringType(),True), \
  ])


In [None]:
# Criando um df a partir de um arquivo csv
df = (spark.read 
    .options(header=True, delimiter=",")
    .schema(schema)
    .csv("gs://official-bucket-01/ChavesClientes-_1_.csv"))


In [None]:
# Tratando as colunas na tabela
df = df.select("ID", "CHAVE_SITUACAO", "CLASS_RISCO", "CAT_CLIENTE", "PAGAMENTO", F.regexp_replace(F.col("CLASS_RISCO"), "[-+]", "").alias("CLASS_RISCO_REPLACE"))
df = df.withColumn("CATEGORIA", df.CLASS_RISCO.substr(0,1))
df = df.withColumn("CLASS_RISCO", df.CLASS_RISCO_REPLACE.substr(2,30))
df = df.select("ID", "CHAVE_SITUACAO", "CLASS_RISCO", "CAT_CLIENTE", "PAGAMENTO", "CATEGORIA")


In [7]:
df.show()


+---+--------------+-----------+--------------+---------+---------+
| ID|CHAVE_SITUACAO|CLASS_RISCO|   CAT_CLIENTE|PAGAMENTO|CATEGORIA|
+---+--------------+-----------+--------------+---------+---------+
|  1|          32FC|      cinza|   Basic-Alpha|        1|        C|
|  2|          25MV|    Amarelo|         Black|        1|        A|
|  3|          27MV|    Amarelo|    Basic-Beta|        1|        B|
|  4|          26FD|    Amarelo|         Black|        0|        B|
|  5|          26FD|    Amarelo|         Black|        0|        C|
|  6|          28FC|    Amarelo|Platinum-Alpha|        0|        C|
|  7|          27MD|      Verde| Platinum-Beta|        1|        A|
|  8|          31MD|      Cinza|         Basic|        0|        C|
|  9|          28FS|      Cinza|         Black|        1|        A|
| 10|          31MV|    Amarelo|      Platinum|        1|        C|
| 11|          29MV|    Amarelo|         Basic|        1|        A|
| 12|          30FC|      Cinza|    Basic-Beta| 

In [3]:

df.write.insertInto('clientes.chaves_clientes', overwrite=False)



In [4]:
df_hive = spark.sql("select * from clientes.chaves_clientes")
print(df.schema)
df.show()

StructType(List(StructField(ID,IntegerType,true),StructField(CHAVE_SITUACAO,StringType,true),StructField(CLASS_RISCO,StringType,true),StructField(CAT_CLIENTE,StringType,true),StructField(PAGAMENTO,IntegerType,true),StructField(CATEGORIA,StringType,true)))
+---+--------------+-----------+--------------+---------+---------+
| ID|CHAVE_SITUACAO|CLASS_RISCO|   CAT_CLIENTE|PAGAMENTO|CATEGORIA|
+---+--------------+-----------+--------------+---------+---------+
|  1|          32FC|      cinza|   Basic-Alpha|        1|        C|
|  2|          25MV|    Amarelo|         Black|        1|        A|
|  3|          27MV|    Amarelo|    Basic-Beta|        1|        B|
|  4|          26FD|    Amarelo|         Black|        0|        B|
|  5|          26FD|    Amarelo|         Black|        0|        C|
|  6|          28FC|    Amarelo|Platinum-Alpha|        0|        C|
|  7|          27MD|      Verde| Platinum-Beta|        1|        A|
|  8|          31MD|      Cinza|         Basic|        0|       