## Exemplo de Uso do Apache Spark com .NET (cenário Batch)

In [1]:
// Baixando e referenciando a biblioteca do .NET for Apache Spark
#r "nuget:Microsoft.Spark, 1.0.0"

In [1]:
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using System;
using System.Collections.Generic;
using static Microsoft.Spark.Sql.Functions; // Para acessar as funções estáticas Col, RegexpReplace, etc.

In [1]:
// Obtém a referência ao contexto de execução do Spark
SparkSession spark = SparkSession
    .Builder()
    .AppName("Exemplo Batch")
    .GetOrCreate();

In [1]:
// Definindo um schema fixo, com os nomes de coluna que eu quero e seus tipos
StructType schema = new StructType(new[]
    {
        new StructField("MES_REFERENCIA", new StringType()),
        new StructField("MES_COMPETENCIA", new StringType()),
        new StructField("UF", new StringType()),
        new StructField("CODIGO_MUNICIPIO", new IntegerType()),
        new StructField("MUNICIPIO", new StringType()),
        new StructField("CODIGO_FAVORECIDO", new StringType()),
        new StructField("NOME", new StringType()),
        new StructField("DATA_SAQUE", new DateType()),
        new StructField("VALOR_TEXTO", new StringType())
    });

In [1]:
// Leitura dos dados em disco para dentro do Spark
DataFrame df = spark.Read()
    .Format("csv")
    .Schema(schema)
    .Option("sep", ";")
    .Option("header", true)
    .Option("dateFormat", "dd/MM/yyyy")
    .Load(@"D:\Projetos\SparkNETDemo\src\BatchDemo\data\amostra.csv");
df.PrintSchema();
df.Show(5, 10);

root
 |-- MES_REFERENCIA: string (nullable = true)
 |-- MES_COMPETENCIA: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- CODIGO_MUNICIPIO: integer (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- CODIGO_FAVORECIDO: string (nullable = true)
 |-- NOME: string (nullable = true)
 |-- DATA_SAQUE: date (nullable = true)
 |-- VALOR_TEXTO: string (nullable = true)



+--------------+---------------+---+----------------+----------+-----------------+----------+----------+-----------+
|MES_REFERENCIA|MES_COMPETENCIA| UF|CODIGO_MUNICIPIO| MUNICIPIO|CODIGO_FAVORECIDO|      NOME|DATA_SAQUE|VALOR_TEXTO|
+--------------+---------------+---+----------------+----------+-----------------+----------+----------+-----------+
|        201910|         201910| AC|             643|ACRELANDIA|       1616761...|ABIGAIL...|2019-10-24|     171,00|
|        201910|         201910| AC|             643|ACRELANDIA|       2122146...|ADALCIL...|2019-10-31|     346,00|
|        201910|         201910| AC|             643|ACRELANDIA|       1612006...|ADALGIZ...|2019-10-28|     178,00|
|        201910|         201910| AC|             643|ACRELANDIA|       2120822...|ADEIDE ...|2019-10-24|     334,00|
|        201910|         201910| AC|             643|ACRELANDIA|       1603692...|ADELINA...|2019-10-21|     457,00|
+--------------+---------------+---+----------------+----------+

In [1]:
// Removendo colunas que não precisamos mais
df = df.Drop("MES_REFERENCIA")
    .Drop("MES_COMPETENCIA")
    .Drop("CODIGO_MUNICIPIO")
    .Drop("CODIGO_FAVORECIDO");
df.Show(5, 10);

+---+----------+----------+----------+-----------+
| UF| MUNICIPIO|      NOME|DATA_SAQUE|VALOR_TEXTO|
+---+----------+----------+----------+-----------+
| AC|ACRELANDIA|ABIGAIL...|2019-10-24|     171,00|
| AC|ACRELANDIA|ADALCIL...|2019-10-31|     346,00|
| AC|ACRELANDIA|ADALGIZ...|2019-10-28|     178,00|
| AC|ACRELANDIA|ADEIDE ...|2019-10-24|     334,00|
| AC|ACRELANDIA|ADELINA...|2019-10-21|     457,00|
+---+----------+----------+----------+-----------+
only showing top 5 rows



In [1]:
// Convertendo a coluna VALOR de string para decimal, considerando que o padrão brasileiro é diferente do americano
df = df.WithColumn("VALOR", RegexpReplace(
                                        RegexpReplace(Col("VALOR_TEXTO"), "\\.", "")
                                        , ",", ".")
        .Cast("decimal(10,2)"))
.Drop("VALOR_TEXTO");
df.PrintSchema();
df.Show(5, 10);

root
 |-- UF: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- NOME: string (nullable = true)
 |-- DATA_SAQUE: date (nullable = true)
 |-- VALOR: decimal(10,2) (nullable = true)



+---+----------+----------+----------+------+
| UF| MUNICIPIO|      NOME|DATA_SAQUE| VALOR|
+---+----------+----------+----------+------+
| AC|ACRELANDIA|ABIGAIL...|2019-10-24|171.00|
| AC|ACRELANDIA|ADALCIL...|2019-10-31|346.00|
| AC|ACRELANDIA|ADALGIZ...|2019-10-28|178.00|
| AC|ACRELANDIA|ADEIDE ...|2019-10-24|334.00|
| AC|ACRELANDIA|ADELINA...|2019-10-21|457.00|
+---+----------+----------+----------+------+
only showing top 5 rows



In [1]:
// Efetuando um filtro em cima dos dados
df = df.Where(df.Col("UF").NotEqual("AC"));
//df = df.Where("UF <> 'AC'");  // passar uma expressão WHERE também funciona como filtro
df.Show(5, 10);

+---+----------+----------+----------+------+
| UF| MUNICIPIO|      NOME|DATA_SAQUE| VALOR|
+---+----------+----------+----------+------+
| RJ|SAO GON...|NICOLY ...|2019-10-22|246.00|
| RJ|SAO GON...|NIDEIA ...|2019-11-08| 89.00|
| RJ|SAO GON...|NIDIA P...|2019-10-22|268.00|
| RJ|SAO GON...|NIEDJA ...|2019-10-30|178.00|
| RJ|SAO GON...|NIEDJA ...|2019-10-21|130.00|
+---+----------+----------+----------+------+
only showing top 5 rows



In [1]:
// Efetua uma agregação dos dados (somar valores pagos por município)
DataFrame somatorio = df.GroupBy("UF", "MUNICIPIO")
    .Sum("VALOR")
    .WithColumnRenamed("sum(VALOR)", "SOMA_BENEFICIOS");
somatorio
.OrderBy(somatorio.Col("SOMA_BENEFICIOS").Desc())
.Show(15, 40);

+---+------------------------+---------------+
| UF|               MUNICIPIO|SOMA_BENEFICIOS|
+---+------------------------+---------------+
| RJ|             SAO GONCALO|     1738096.00|
| SP|              SAO CARLOS|     1267911.00|
| SP|   SAO BERNARDO DO CAMPO|      173999.00|
| SP|      SAO CAETANO DO SUL|      130793.00|
| SP|   SAO JOAO DA BOA VISTA|      105278.00|
| SP|    SAO JOAQUIM DA BARRA|       46401.00|
| SP|  SAO JOAO DO PAU D'ALHO|       17808.00|
| SP|           SAO FRANCISCO|       11138.00|
| SP|SAO JOAO DAS DUAS PONTES|        7563.00|
| SP|     SAO JOAO DE IRACEMA|        7164.00|
+---+------------------------+---------------+



In [1]:
var propriedades = new Dictionary<string, string>()
{
    { "user", "spark_user" },
    { "password", "my-secret-password" }
};
// Salvando em banco de dados com funcionalidade nativa do Spark
somatorio
    .Write()
    .Mode(SaveMode.Overwrite)
    .Option("driver", "com.mysql.cj.jdbc.Driver")
    .Jdbc("jdbc:mysql://localhost:3306/teste_spark", "beneficios", propriedades);