In [None]:
'''
A tecnologia escolhida foi HDFS para armazenar os arquivos, PySpark (Jupyter) para ingestão de dados e processamento e armazenamento no MySql.
A razão por se escolher o Py - Spark é de ser multi-funções: 
   * podemos executar comandos python;
   * fazer ingestão de dados no spark;
   * os selects no dataframe fazem o map-reduce (sou mais familirizado com SQL);
   * a possibilidade de ja armazenar os dados diretamente;

1) Baixar arquivos do link e armazenar no HDFS;
2) Pode se tratar antes os arquivos e conteúdo com o python (porem neste caso tratei direto no Dataframe)
3) Ler arquivos CSV / JSON no Spark em um DataFrame;
4) Modificar o nome das colunas, removendo espaços e acentos (pyspark);
5) Modificar coluna de valor trocando virgula por ponto (pyspark);
6) Criar uma tempview no spark a partir do daframe ja tratado, filtrando somente salario maternidade;
7) Fazer um select com group by por Ano e obtendo a o SUM(qtd de beneficios) e SUM(valor), guardando num dataframe;
8) Criar database e tabela no MySQL com as colunas solicitadas no exercicio: indicador, ano, valor_medio (mesmo nome das colunas do dataframe);
9) Conectar no MySQL e armazenar o Dataframe na tabela criada (cheguei até esse ponto, erro na configuração do JDBC)
'''

In [33]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ex1').getOrCreate()

In [34]:
dfSalMater = spark.read.csv('hdfs:///user/hduser/spark/project/MAN01.csv',header=True)

In [35]:
dfSalMater.count()

17555

In [36]:
import re
dfSalMater = dfSalMater.toDF(*(re.sub(r'[\.\s]+', '_', c) for c in dfSalMater.columns))
dfSalMater = dfSalMater.toDF(*(re.sub(r'í', 'i', c) for c in dfSalMater.columns))
dfSalMater = dfSalMater.toDF(*(re.sub(r'é', 'e', c) for c in dfSalMater.columns))
dfSalMater = dfSalMater.toDF(*(re.sub(r'çã', 'ca', c) for c in dfSalMater.columns))
dfSalMater = dfSalMater.toDF(*(re.sub(r'_\(R\$\)', '', c) for c in dfSalMater.columns))
dfSalMater = dfSalMater.toDF(*(re.sub(r'/', '_', c) for c in dfSalMater.columns))
dfSalMater.printSchema()


root
 |-- Ano: string (nullable = true)
 |-- Especie: string (nullable = true)
 |-- Unidade_da_Federacao: string (nullable = true)
 |-- Clientela: string (nullable = true)
 |-- Quantidade_Beneficios_Mantidos: string (nullable = true)
 |-- Vlr_Benef_Mantidos: string (nullable = true)
 |-- Qte_e_Valor: string (nullable = true)
 |-- Grupo_Principais_Especies: string (nullable = true)



In [37]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import *
from pyspark.sql.types import  *

In [38]:
udf = UserDefinedFunction(lambda x: x.replace(",","."), StringType())
dfSalMater = dfSalMater.withColumn("Vlr_Benef_Mantidos", udf(col("Vlr_Benef_Mantidos")).cast(DoubleType()))
dfSalMater.printSchema()

root
 |-- Ano: string (nullable = true)
 |-- Especie: string (nullable = true)
 |-- Unidade_da_Federacao: string (nullable = true)
 |-- Clientela: string (nullable = true)
 |-- Quantidade_Beneficios_Mantidos: string (nullable = true)
 |-- Vlr_Benef_Mantidos: double (nullable = true)
 |-- Qte_e_Valor: string (nullable = true)
 |-- Grupo_Principais_Especies: string (nullable = true)



In [39]:
dfSalMater.show()

+----+--------------------+--------------------+---------+------------------------------+------------------+--------------------+-------------------------+
| Ano|             Especie|Unidade_da_Federacao|Clientela|Quantidade_Beneficios_Mantidos|Vlr_Benef_Mantidos|         Qte_e_Valor|Grupo_Principais_Especies|
+----+--------------------+--------------------+---------+------------------------------+------------------+--------------------+-------------------------+
|1988|42-Ap Tempo Contr...|             Alagoas|   Urbana|                          6205|               0.2|Quantidade Benefí...|     42-Ap Tempo Contr...|
|1988|42-Ap Tempo Contr...|            Amazonas|   Urbana|                          3076|              0.13|Quantidade Benefí...|     42-Ap Tempo Contr...|
|1988|42-Ap Tempo Contr...|               Bahia|   Urbana|                         31049|              1.27|Quantidade Benefí...|     42-Ap Tempo Contr...|
|1988|42-Ap Tempo Contr...|               Ceará|   Urbana|      

In [40]:
dfSalMater.createOrReplaceTempView("salario_mater")

In [41]:
dfSalMater.printSchema()

root
 |-- Ano: string (nullable = true)
 |-- Especie: string (nullable = true)
 |-- Unidade_da_Federacao: string (nullable = true)
 |-- Clientela: string (nullable = true)
 |-- Quantidade_Beneficios_Mantidos: string (nullable = true)
 |-- Vlr_Benef_Mantidos: double (nullable = true)
 |-- Qte_e_Valor: string (nullable = true)
 |-- Grupo_Principais_Especies: string (nullable = true)



In [42]:
dfSQLSalMater = spark.sql("select * from salario_mater where Especie = '80-Salário-Maternidade' and Ano is not null")
dfSQLSalMater.show()

+----+--------------------+--------------------+---------+------------------------------+------------------+--------------------+-------------------------+
| Ano|             Especie|Unidade_da_Federacao|Clientela|Quantidade_Beneficios_Mantidos|Vlr_Benef_Mantidos|         Qte_e_Valor|Grupo_Principais_Especies|
+----+--------------------+--------------------+---------+------------------------------+------------------+--------------------+-------------------------+
|1992|80-Salário-Matern...|             Alagoas|   Urbana|                             4|              0.76|Quantidade Benefí...|     80-Salário-Matern...|
|1992|80-Salário-Matern...|             Alagoas|    Rural|                             0|               0.0|Quantidade Benefí...|     80-Salário-Matern...|
|1992|80-Salário-Matern...|            Amazonas|   Urbana|                             0|               0.0|Quantidade Benefí...|     80-Salário-Matern...|
|1992|80-Salário-Matern...|            Amazonas|    Rural|      

In [43]:
dfMediaMaternidade = spark.sql("select 'salario_maternidade' as indicador , "
                               " Ano as ano,  round((sum_valor / sum_qtd),5) as valor_medio from "
          "(SELECT Ano , "
            "    round(sum(Quantidade_Beneficios_Mantidos),5) as sum_qtd, "
            "    round(sum(Vlr_Benef_Mantidos), 5) as sum_valor"
            "  from salario_mater "
            "  group by Ano )"
            " order by Ano " )

In [44]:
dfMediaMaternidade.show() 

+-------------------+----+-----------+
|          indicador| ano|valor_medio|
+-------------------+----+-----------+
|salario_maternidade|   -|       null|
|salario_maternidade|1988|     2.0E-5|
|salario_maternidade|1989|     4.0E-4|
|salario_maternidade|1990|    0.00443|
|salario_maternidade|1991|    0.02025|
|salario_maternidade|1992|    0.30531|
|salario_maternidade|1993|   11.02098|
|salario_maternidade|1994|  111.49281|
|salario_maternidade|1995|  163.88484|
|salario_maternidade|1996|  193.49026|
|salario_maternidade|1997|  216.31316|
|salario_maternidade|1998|  234.86222|
|salario_maternidade|1999|  248.14432|
|salario_maternidade|2000|  261.90032|
|salario_maternidade|2001|  292.93459|
|salario_maternidade|2002|  327.63214|
|salario_maternidade|2003|  402.22265|
+-------------------+----+-----------+



In [45]:
# erro neste ponto: não foi possivel fazer a conexão de JDBC por algum problema de configuração
# Py4JJavaError: An error occurred while calling o412.save. java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
dfMediaMaternidade.write.format('jdbc').options(
      url='jdbc:mysql://localhost:3306/proj01',
      driver='com.mysql.jdbc.Driver',
      dbtable='beneficio_medio_ano',
      user='root',
      password='hadoop').mode('append').save();


Py4JJavaError: An error occurred while calling o412.save.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:78)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:78)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:78)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:34)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:55)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
'''
A seguir logica similar para o arquivo JSON
Ler do HDFS em Dataframe
Ver funções do Python / Spark para converter os elementos do JSON em dados
Converter o DF em Temp View
Fazer o select da tempview fazendo o SUM com group by, e depois um select externo obtendo a média
Armazenar o resultado num DF
Armazenar o DF numa tabela do MySQL / database proj01 / tabela 2
'''

In [5]:
dfBenef = spark.read.json('hdfs:///user/hduser/spark/project/CON09.json')

In [6]:
dfBenef.count()

1

In [7]:
dfBenef.printSchema()

root
 |-- nodes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- node: struct (nullable = true)
 |    |    |    |-- Ano: string (nullable = true)
 |    |    |    |-- Clientela: string (nullable = true)
 |    |    |    |-- Evolução Mensal (AnoMes): string (nullable = true)
 |    |    |    |-- Grupo/Principais Espécies: string (nullable = true)
 |    |    |    |-- Mês: string (nullable = true)
 |    |    |    |-- Qte Benefícios Concedidos: string (nullable = true)
 |    |    |    |-- Vlr Benefícios Concedidos (R$): string (nullable = true)
 |-- total: struct (nullable = true)
 |    |-- Ano: string (nullable = true)
 |    |-- Clientela: string (nullable = true)
 |    |-- Evolução Mensal (AnoMes): string (nullable = true)
 |    |-- Mês: string (nullable = true)
 |    |-- Qte Benefícios Concedidos: string (nullable = true)

