In [None]:
#Iniciando o Spark no EMR
#spark

In [None]:
#Importando bibliotecas
import json
from datetime import timedelta, datetime

#Importação da biblioteca PySpark para processamento de dados em larga escala
from pyspark.sql import SparkSession, Row
from decimal import Decimal
from pyspark.sql.functions import lit, col, expr, to_timestamp, format_number,udf, when
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DoubleType, IntegerType, TimestampType, DateType, ArrayType, DecimalType

In [None]:
#Inicializando uma sessao spark
spark = SparkSession.builder.appName('Nubank').getOrCreate()

## DataFrames

#### Montando a tabela de Transacoes

In [None]:
#transformando json em dataframe
compras = spark.read.json('s3://nubank-api-dependencias/JSONS/compras.json')

In [None]:
# Converte a coluna 'time' para o formato Timestamp e divide a coluna 'amount' por 100
compras = compras.withColumn('time', to_timestamp(compras['time'], 'yyyy-MM-dd\'T\'HH:mm:ssX'))
compras = compras.withColumn('amount', col("amount") / 100)

In [None]:
# Adicionar uma coluna vazia ao DataFrame df_transacoes para corresponder ao DataFrame df_parcelas
compras = compras.withColumn("qtd_parcelas", lit(None).cast("integer"))
compras = compras.withColumn("parcela", lit(None).cast("integer"))

In [None]:
#Removendo colunas desnecessarios
compras = compras.drop(*['_links','account','amount_without_iof','details','href','id','source','tokenized'])

In [None]:
#Schema do DataFrame
schema_ = StructType([
    StructField('amount', DoubleType(), True),
    StructField('category', StringType(), True),
    StructField('description', StringType(), True),
    StructField('time', TimestampType(), True),
    StructField('title', StringType(), True),
    StructField('qtd_parcelas', IntegerType(), True),
    StructField('parcela', IntegerType(), True)
])

In [None]:
#Criando o DataFrame de parcelas
parcelas = spark.read.json('s3://nubank-api-dependencias/JSONS/parcelas.json',schema_)

In [None]:
# Converte a coluna 'time' para o formato Timestamp
parcelas = parcelas.withColumn('time', to_timestamp(parcelas['time'], 'yyyy-MM-dd\'T\'HH:mm:ssX'))

In [None]:
parcelas = parcelas.withColumn("parcela", col("parcela").cast("integer"))
parcelas = parcelas.withColumn("qtd_parcelas", col("qtd_parcelas").cast("integer"))

In [None]:
#Unindo os DataFrames compras e parcelas
transacoes = compras.union(parcelas)

In [None]:
# Substituir os nulls por 0 na coluna "qtd_parcelas"
transacoes = transacoes.fillna(0, subset=["qtd_parcelas"])

# Substituir os nulls por 0 na coluna "parcela"
transacoes = transacoes.fillna(0, subset=["parcela"])

In [None]:
#Convertendo o DataFrame em parquet
transacoes.write.\
parquet('s3://nubank-api-dependencias/Tabelas/transacoes/',mode='overwrite')

#### Limite

In [None]:
# Transformando o arquivo Json em DataFrame
limite = spark.read.json('s3://nubank-api-dependencias/JSONS/limite.json')

In [None]:
# Converte a coluna 'Data' para o formato Timestamp
limite = limite.withColumn('Data', to_timestamp(limite['Data'], 'yyyy:MM:dd HH:mm:ss'))

In [None]:
#Nome parquet
limite_name = datetime.now().strftime('%Y:%m:%d %H:%M:%S')
limite_parquet_name = 'limite-'+limite_name

In [None]:
#Convertendo o DataFrame em parquet
limite.write.parquet(f's3://nubank-api-dependencias/Tabelas/limite/{limite_parquet_name}.parquet')

#### Saldo

In [None]:
# Transformando o arquivo Json em DataFrame
saldo = spark.read.json('s3://nubank-api-dependencias/JSONS/saldo.json')

In [None]:
# Converte a coluna 'Data' para o formato Timestamp 
saldo = saldo.withColumn('Data', to_timestamp(saldo['Data'], 'yyyy:MM:dd HH:mm:ss'))

In [None]:
#Nome parquet
saldo_name = datetime.now().strftime('%Y:%m:%d %H:%M:%S')
saldo_parquet_name = 'saldo-'+saldo_name

In [None]:
#Convertendo o DataFrame em parquet
saldo.write.parquet(f's3://nubank-api-dependencias/Tabelas/saldo/{saldo_parquet_name}.parquet')

#### Despesas

In [None]:
#Schema do dataframe Despesas
schemas = StructType([
    StructField('postDate', StringType(), True),
    StructField('title', StringType(), True),
    StructField('amount', FloatType(), True)
])

In [None]:
# Transformando o arquivo Json em DataFrame
despesas = spark.read.json('s3://nubank-api-dependencias/JSONS/despesas.json',schemas)

In [None]:
# Converte a coluna 'postDate' para o formato Timestamp 
despesas = despesas.withColumn('postDate', to_timestamp(despesas['postDate'], 'yyyy-MM-dd'))

In [None]:
#Convertendo o DataFrame em parquet
despesas.write.\
parquet('s3://nubank-api-dependencias/Tabelas/despesas/', mode='overwrite')

#### Receitas

In [None]:
# Transformando o arquivo Json em DataFrame
receitas = spark.read.json('s3://nubank-api-dependencias/JSONS/receitas.json',schemas)

In [None]:
# Converte a coluna 'postDate' para o formato Timestamp 
receitas = receitas.withColumn('postDate', to_timestamp(receitas['postDate'], 'yyyy-MM-dd'))

In [None]:
#Convertendo o DataFrame em parquet
receitas.write.\
parquet('s3://nubank-api-dependencias/Tabelas/receitas/', mode='overwrite')