In [None]:
import findspark
findspark.init()

import pyspark


In [None]:
spark = pyspark.sql.SparkSession.builder \
     .master("local") \
     .appName("Word Count") \
     .config("spark.some.config.option", "some-value") \
     .getOrCreate()
sc = spark.sparkContext

### Projeto da mozilla pra monitoramento: https://github.com/mozilla/jupyter-spark

In [None]:
sc.uiWebUrl # ou spark.sparkContext.uiWebUrl

In [None]:
df_d = spark.read.csv(
    "/Users/diogo.munaro/learn/tutorial-jupyter/dataset/despesa.csv", header=True, mode="DROPMALFORMED"
)
df_r = spark.read.csv(
    "/Users/diogo.munaro/learn/tutorial-jupyter/dataset/receita.csv", header=True, mode="DROPMALFORMED"
)

## Despesa

In [None]:
df_d.printSchema()

In [None]:
df_d.count()

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
def sanitize_valor(valor):
    return float(valor.replace(" ", "."))

udf_sanitize_valor=udf(sanitize_valor, FloatType())

In [None]:
df_d_new = df_d.withColumn('total', udf_sanitize_valor('Pago'))

In [None]:
from pyspark.sql.functions import desc

gastos = df_d_new.select("Acao", "Descricao_Acao", "total") \
    .groupBy("Acao", "Descricao_Acao") \
    .sum("total") \
    .orderBy(desc("sum(total)"))
gastos.show(truncate=False)

In [None]:
gastos_pd = gastos.limit(20).toPandas()

In [None]:
%matplotlib inline
gastos_pd.columns = ["Ação", "Descrição", "Total Gasto"]
gastos_pd.plot(x='Ação', y='Total Gasto', kind='bar')

In [None]:
df_d.select("Acao", "Descricao_Acao").distinct().show(truncate=False)

## Receita

In [None]:
df_r.printSchema()

In [None]:
import re
r = re.compile('RECEITA_ARRECADADA_.*')
colunas = [c for c in df_r.columns if r.match(c)]

In [None]:
for c in colunas:
    df_r = df_r.withColumn('TOTAL_%s' %c, udf_sanitize_valor(c))

In [None]:
df_r = df_r.withColumn('TOTAL_RECEITA_ARRECADADA', udf_sanitize_valor('RECEITA_ARRECADADA'))

In [None]:
df_r = df_r.withColumn('TOTAL_RECEITA_PREVISTA', udf_sanitize_valor('RECEITA_PREVISTA'))

In [None]:
df_r_new = df_r.withColumn('total', sum(df_r['TOTAL_%s' % col] for col in colunas))

In [None]:
df_r_new.select('total', 'TOTAL_RECEITA_ARRECADADA', 'TOTAL_RECEITA_PREVISTA').limit(10).show()

In [None]:
df_r_defict = df_r_new.withColumn('defict', df_r_new['total'] - df_r_new['TOTAL_RECEITA_PREVISTA'])

In [None]:
df_r_defict.select('total', 'TOTAL_RECEITA_ARRECADADA', 'TOTAL_RECEITA_PREVISTA').describe().show()

In [None]:
df_r_defict_grouped = df_r_defict.groupBy().sum('defict', 'TOTAL_RECEITA_ARRECADADA')
df_r_defict_grouped.show(truncate=False)

In [None]:
df_r_defict_grouped_invert = df_r_defict_grouped.withColumn('defict_invert', df_r_defict_grouped['sum(defict)']* -1)
df_r_defict_grouped_invert.show()

In [None]:
df_r_defict_grouped_invert_pd = df_r_defict_grouped_invert.select('defict_invert', 'sum(TOTAL_RECEITA_ARRECADADA)').toPandas()

In [None]:
df_r_defict_grouped_invert_pd.columns = ["Défict", "Arrecadado"]

In [None]:
df_r_transpose = df_r_defict_grouped_invert_pd.transpose()

In [None]:
df_r_transpose.columns = [""]

In [None]:
df_r_transpose.plot(kind="pie", subplots=True)