In [15]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, count

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
path_file = r'C:\Workspace\DataAnalytics\Auxilio_Emergencial\csv_aux_emergencial\202103_AuxilioEmergencial.csv'

In [4]:
df = spark.read.csv(path_file, encoding='ISO-8859-1', sep='";"', header=True)

In [5]:
df = df.toDF('ANO_MES', 'UF', 'COD_MUNICIPIO','NOME_MUNICIPIO','NIS_BENEF','CPF_BENEF','NOME_BENEF','NIS_RESP','CPF_RESP','NOME_RESP','ENQUADRAMENTO','PARCELA','OBS','VALOR')

In [6]:
df2 = df.withColumn('ANO_MES', regexp_replace('ANO_MES', '"', ''))\
    .withColumn('VALOR', regexp_replace('VALOR', '"', ''))\
    .withColumn('VALOR', regexp_replace('VALOR', ',', '.'))\
    .withColumn('VALOR', col('VALOR').cast('float'))

In [11]:
df2.filter("COD_MUNICIPIO IS NOT NULL").show(2)

+-------+---+-------------+--------------+---------+---------+----------+--------+--------+-------------+-------------+-------+------+-----+
|ANO_MES| UF|COD_MUNICIPIO|NOME_MUNICIPIO|NIS_BENEF|CPF_BENEF|NOME_BENEF|NIS_RESP|CPF_RESP|    NOME_RESP|ENQUADRAMENTO|PARCELA|   OBS|VALOR|
+-------+---+-------------+--------------+---------+---------+----------+--------+--------+-------------+-------------+-------+------+-----+
| 202103| AC|      1200054|  ASSIS BRASIL|       -3|     null|  Inválido|      -2|    null|Não se aplica|     EXTRACAD|     3ª|Não há|600.0|
| 202103| AC|      1200054|  ASSIS BRASIL|       -3|     null|  Inválido|      -2|    null|Não se aplica|     EXTRACAD|     5ª|Não há|600.0|
+-------+---+-------------+--------------+---------+---------+----------+--------+--------+-------------+-------------+-------+------+-----+
only showing top 2 rows



In [19]:
df2.filter("COD_MUNICIPIO IS NOT NULL AND CPF_BENEF IS NULL").show(2)

+-------+---+-------------+--------------+---------+---------+----------+--------+--------+-------------+-------------+-------+------+-----+
|ANO_MES| UF|COD_MUNICIPIO|NOME_MUNICIPIO|NIS_BENEF|CPF_BENEF|NOME_BENEF|NIS_RESP|CPF_RESP|    NOME_RESP|ENQUADRAMENTO|PARCELA|   OBS|VALOR|
+-------+---+-------------+--------------+---------+---------+----------+--------+--------+-------------+-------------+-------+------+-----+
| 202103| AC|      1200054|  ASSIS BRASIL|       -3|     null|  Inválido|      -2|    null|Não se aplica|     EXTRACAD|     3ª|Não há|600.0|
| 202103| AC|      1200054|  ASSIS BRASIL|       -3|     null|  Inválido|      -2|    null|Não se aplica|     EXTRACAD|     5ª|Não há|600.0|
+-------+---+-------------+--------------+---------+---------+----------+--------+--------+-------------+-------------+-------+------+-----+
only showing top 2 rows



In [21]:
df2.filter("COD_MUNICIPIO IS NOT NULL AND CPF_BENEF IS NOT NULL")\
    .groupBy(['ANO_MES','COD_MUNICIPIO'])\
    .agg(count('COD_MUNICIPIO').alias('TOTAL_BENEF')).show()

+-------+-------------+-----------+
|ANO_MES|COD_MUNICIPIO|TOTAL_BENEF|
+-------+-------------+-----------+
| 202103|      2707107|          6|
| 202103|      2906907|          4|
| 202103|      5213756|          1|
| 202103|      3120508|          4|
| 202103|      3129657|          4|
| 202103|      3152105|          9|
| 202103|      5001102|         19|
| 202103|      4117206|          3|
| 202103|      2703007|          5|
| 202103|      2914802|         24|
| 202103|      2914901|          3|
| 202103|      2920403|          5|
| 202103|      2925758|          4|
| 202103|      2307007|          3|
| 202103|      2105302|         68|
| 202103|      2111904|          6|
| 202103|      3117702|          4|
| 202103|      3154200|          5|
| 202103|      2607208|         23|
| 202103|      2616308|         72|
+-------+-------------+-----------+
only showing top 20 rows



In [20]:
df2.filter("COD_MUNICIPIO IS NOT NULL AND CPF_BENEF IS NULL")\
    .groupBy(['ANO_MES','COD_MUNICIPIO'])\
    .agg(count('COD_MUNICIPIO').alias('TOTAL_ANONIMOS')).show()

+-------+-------------+--------------+
|ANO_MES|COD_MUNICIPIO|TOTAL_ANONIMOS|
+-------+-------------+--------------+
| 202103|      3132909|             5|
| 202103|      5001102|             5|
| 202103|      2914802|             5|
| 202103|      2105302|            11|
| 202103|      3154200|             5|
| 202103|      2616308|            18|
| 202103|      2304103|             5|
| 202103|      2311801|             5|
| 202103|      5204508|             5|
| 202103|      3134707|             5|
| 202103|      3170057|             5|
| 202103|      2514206|             5|
| 202103|      2509107|             2|
| 202103|      2513802|             5|
| 202103|      2913002|             9|
| 202103|      3138401|             5|
| 202103|      2706703|             9|
| 202103|      3157807|             6|
| 202103|      3167103|             5|
| 202103|      2303600|             5|
+-------+-------------+--------------+
only showing top 20 rows



In [7]:
df_final = df2.filter("COD_MUNICIPIO IS NOT NULL").groupBy(['ANO_MES','COD_MUNICIPIO','ENQUADRAMENTO','PARCELA']).sum('VALOR')

In [8]:
df_final_order = df_final.orderBy(['COD_MUNICIPIO','ENQUADRAMENTO','PARCELA'])

In [46]:
ano_mes = df_final_order.select('ANO_MES').limit(1).collect()

In [47]:
destino = r'C:\Workspace\DataAnalytics\Auxilio_Emergencial\csv_aux_emergencial'
path_destino = destino+'\\'+ano_mes[0][0]

In [51]:
try:
    os.mkdir(path_destino)
except FileExistsError:
    pass

In [57]:
file_csv = path_destino+'\\'+'AuxilioEmergencial.csv'
df_final_order.toPandas().to_csv(file_csv)

In [59]:
carga_municipios = df5.select('COD_MUNICIPIO', 'NOME_MUNICIPIO', 'UF').distinct().filter("COD_MUNICIPIO IS NOT NULL")

In [60]:
csv_municipio = path_destino+'\\'+'municipios.csv'
carga_municipios.toPandas().to_csv(csv_municipio)

In [3]:
path_download = 'C:\\Users\\antoliverjr\\Downloads'
path_historico = 'C:\\Workspace\\DataAnalystics\\Aux_Emergencial\\historico_zip'
path_files = 'C:\\Workspace\\DataAnalystics\\Aux_Emergencial\\csv_aux_emergencial'
url_download = 'https://www.portaldatransparencia.gov.br/download-de-dados/auxilio-emergencial/'

try: os.mkdir(path_files)
except FileExistsError: pass

try: os.mkdir(path_historico)
except FileExistsError: pass

file_name = '202104_AuxilioEmergencial.zip'
path_file_origin = os.path.join(path_download, file_name)
path_file_job = os.path.join(path_historico, file_name)

print(file_name)
print(path_historico)

for files in os.listdir(path_historico):
    print(files)

202104_AuxilioEmergencial.zip
C:\Workspace\DataAnalystics\Aux_Emergencial\historico_zip
