In [1]:
%%bash

# Instal Java
apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

In [2]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
# CREATE DATALAKE
#mkdir = make directory

%%bash
mkdir landing
mkdir processing
mkdir curated

In [4]:
# Extraction Process

%%bash
cd landing 
wget http://repositorio.dados.gov.br/seges/comprasnet_contratos/anual/comprasnet-contratos-anual-cronogramas-latest.csv

--2022-03-25 13:27:54--  http://repositorio.dados.gov.br/seges/comprasnet_contratos/anual/comprasnet-contratos-anual-cronogramas-latest.csv
Resolving repositorio.dados.gov.br (repositorio.dados.gov.br)... 189.9.7.16
Connecting to repositorio.dados.gov.br (repositorio.dados.gov.br)|189.9.7.16|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 217078190 (207M) [application/octet-stream]
Saving to: ‘comprasnet-contratos-anual-cronogramas-latest.csv’

     0K .......... .......... .......... .......... ..........  0%  127K 27m43s
    50K .......... .......... .......... .......... ..........  0%  261K 20m37s
   100K .......... .......... .......... .......... ..........  0% 8.43M 13m53s
   150K .......... .......... .......... .......... ..........  0% 12.1M 10m29s
   200K .......... .......... .......... .......... ..........  0%  262K 11m4s
   250K .......... .......... .......... .......... ..........  0% 21.6M 9m15s
   300K .......... .......... .......... .......

In [7]:
# READING DATA WITH SPARK AND OPTIMIZING DATA

df = spark.read.option('header',True).csv('/content/landing/comprasnet-contratos-anual-cronogramas-latest.csv') 
df.write.mode('overwrite').format('parquet').save('/content/processing')

In [11]:
# READING DATA AND CREATING VIEW

df = spark.read.parquet('/content/processing')
df.createOrReplaceTempView('df')

In [37]:
# TRANSFORMING DATA

df2 = spark.sql(""" 
  
WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

""")

In [38]:
# DELIVERYING THE DATA

(
    df2.orderBy('year', ascending=False)
    .orderBy('month', ascending=False)
    .orderBy('day', ascending=False)
    .write.partitionBy('year','month','day')
    .mode('overwrite')
    .format('parquet')
    .save('/content/curated')
)