In [1]:
### Etapa de verificação de ambiente de execução prévio
import shutil
import os

if os.path.exists("ml"):
    shutil.rmtree("ml")
if os.path.exists("data_parquet"):
    shutil.rmtree("data_parquet")

In [2]:
from delta.tables import *
from delta import *
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import json
import requests
from IPython.display import clear_output

#Inicialização do ambiente Spark
#Configuração necessária para utilização do Delta Lake
builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
clear_output(wait = True)

if(spark):
    print("Sessão Spark Inicializada com Sucesso!")


Sessão Spark Inicializada com Sucesso!


Atividade 1: Ingestão e normalização dos dados

In [3]:
#Abertura dos dados iniciais referentes aos pedidos
raw_path = "pedidos.csv"
raw_df = spark.read\
                .format("csv")\
                .option("inferSchema","true")\
                .option("header","true")\
                .load(raw_path)

In [4]:
raw_df.show(10)

+---+---------+---------+--------------------+------------+-------------------+---------------+-----+
| id|     nome|sobrenome|               email|valor_pedido|        data_pedido|        tipo_cc|id_ip|
+---+---------+---------+--------------------+------------+-------------------+---------------+-----+
|  1|Aleksandr| Crighton|acrighton0@instag...|      $71.31|2021-08-21 14:18:35|            jcb|    1|
|  2|      Tad|   Arangy|tarangy1@studiopr...|      $28.12|2021-10-28 22:44:27|        maestro|    2|
|  3|     Wake|   Samart|   wsamart2@ning.com|     $148.18|2022-06-26 22:33:37|  visa-electron|    3|
|  4|    Svend|  Morfell|smorfell3@arizona...|     $148.74|2021-12-23 15:19:03|            jcb|    4|
|  5|   Phoebe|Wealthall|pwealthall4@smugm...|     $146.84|2022-04-16 23:08:17|        maestro|    5|
|  6|    Sayre|   Ashbey|sashbey5@google.c...|     $118.62|2022-06-14 22:24:33|            jcb|    6|
|  7|      Dun|   Breens|dbreens6@seattlet...|     $136.92|2021-09-24 17:54:06|  v

In [5]:
#Conversão dos dados para Parquet
raw_df.select("*").write.format("parquet").save("data_parquet")
df = spark.read.parquet("data_parquet")

                                                                                

In [6]:
# Cria os diretórios referentes as camadas do pipeline
os.mkdir('ml')
os.mkdir("ml/bronze")
os.mkdir("ml/silver")
os.mkdir("ml/gold")

In [7]:
# Path da Layer Bronze
DELTALAKE_BRONZE_PATH = "ml/bronze"

# Inicializa a tabela delta bronze com os dados inciais
df.write.format('delta').mode('overwrite').save(DELTALAKE_BRONZE_PATH)

# Registra a tabela SQL no banco de dados
spark.sql(f"CREATE TABLE bronze USING delta LOCATION '{DELTALAKE_BRONZE_PATH}'") 

# Leitura da tabela
request_stats = spark.read.format("delta").load(DELTALAKE_BRONZE_PATH)

display(request_stats)

                                                                                

DataFrame[id: int, nome: string, sobrenome: string, email: string, valor_pedido: string, data_pedido: string, tipo_cc: string, id_ip: int]

In [8]:
request_stats.show(10)

+---+---------+---------+--------------------+------------+-------------------+---------------+-----+
| id|     nome|sobrenome|               email|valor_pedido|        data_pedido|        tipo_cc|id_ip|
+---+---------+---------+--------------------+------------+-------------------+---------------+-----+
|  1|Aleksandr| Crighton|acrighton0@instag...|      $71.31|2021-08-21 14:18:35|            jcb|    1|
|  2|      Tad|   Arangy|tarangy1@studiopr...|      $28.12|2021-10-28 22:44:27|        maestro|    2|
|  3|     Wake|   Samart|   wsamart2@ning.com|     $148.18|2022-06-26 22:33:37|  visa-electron|    3|
|  4|    Svend|  Morfell|smorfell3@arizona...|     $148.74|2021-12-23 15:19:03|            jcb|    4|
|  5|   Phoebe|Wealthall|pwealthall4@smugm...|     $146.84|2022-04-16 23:08:17|        maestro|    5|
|  6|    Sayre|   Ashbey|sashbey5@google.c...|     $118.62|2022-06-14 22:24:33|            jcb|    6|
|  7|      Dun|   Breens|dbreens6@seattlet...|     $136.92|2021-09-24 17:54:06|  v

In [9]:
#Seleção das colunas para concatenação e abertura de nova coluna com os dados concatenados
concat_col_list = ['nome','sobrenome']
request_stats = request_stats.withColumn('nome_completo',concat_ws('_',*concat_col_list))

#Remoção das colunas concatenadas
request_stats = request_stats.drop("nome").drop("sobrenome")

#Reorganização da tabela 
request_stats.select("id","nome_completo","email","valor_pedido","data_pedido","tipo_cc","id_ip")
request_stats = request_stats.select("id","nome_completo","email","valor_pedido","data_pedido","tipo_cc","id_ip")

In [10]:
request_stats.show(10)

+---+------------------+--------------------+------------+-------------------+---------------+-----+
| id|     nome_completo|               email|valor_pedido|        data_pedido|        tipo_cc|id_ip|
+---+------------------+--------------------+------------+-------------------+---------------+-----+
|  1|Aleksandr_Crighton|acrighton0@instag...|      $71.31|2021-08-21 14:18:35|            jcb|    1|
|  2|        Tad_Arangy|tarangy1@studiopr...|      $28.12|2021-10-28 22:44:27|        maestro|    2|
|  3|       Wake_Samart|   wsamart2@ning.com|     $148.18|2022-06-26 22:33:37|  visa-electron|    3|
|  4|     Svend_Morfell|smorfell3@arizona...|     $148.74|2021-12-23 15:19:03|            jcb|    4|
|  5|  Phoebe_Wealthall|pwealthall4@smugm...|     $146.84|2022-04-16 23:08:17|        maestro|    5|
|  6|      Sayre_Ashbey|sashbey5@google.c...|     $118.62|2022-06-14 22:24:33|            jcb|    6|
|  7|        Dun_Breens|dbreens6@seattlet...|     $136.92|2021-09-24 17:54:06|  visa-electr

In [11]:
# Remoção do caractere "$" da coluna de valores do pedido e conversão para tipo double
# Conversão do tipo da coluna de datas para timestamp
request_stats = request_stats.withColumn('valor_pedido', regexp_replace('valor_pedido', '\$','').cast('double'))
request_stats = request_stats.withColumn('data_pedido', to_timestamp("data_pedido", 'yyyy-MM-dd HH:mm:ss'))

In [12]:
request_stats.show(10)
display(request_stats)

+---+------------------+--------------------+------------+-------------------+---------------+-----+
| id|     nome_completo|               email|valor_pedido|        data_pedido|        tipo_cc|id_ip|
+---+------------------+--------------------+------------+-------------------+---------------+-----+
|  1|Aleksandr_Crighton|acrighton0@instag...|       71.31|2021-08-21 14:18:35|            jcb|    1|
|  2|        Tad_Arangy|tarangy1@studiopr...|       28.12|2021-10-28 22:44:27|        maestro|    2|
|  3|       Wake_Samart|   wsamart2@ning.com|      148.18|2022-06-26 22:33:37|  visa-electron|    3|
|  4|     Svend_Morfell|smorfell3@arizona...|      148.74|2021-12-23 15:19:03|            jcb|    4|
|  5|  Phoebe_Wealthall|pwealthall4@smugm...|      146.84|2022-04-16 23:08:17|        maestro|    5|
|  6|      Sayre_Ashbey|sashbey5@google.c...|      118.62|2022-06-14 22:24:33|            jcb|    6|
|  7|        Dun_Breens|dbreens6@seattlet...|      136.92|2021-09-24 17:54:06|  visa-electr

DataFrame[id: int, nome_completo: string, email: string, valor_pedido: double, data_pedido: timestamp, tipo_cc: string, id_ip: int]

In [13]:
# Path da layer Silver
DELTALAKE_SILVER_PATH = "ml/silver"

# Inicializa a tabela delta silver com os dados inciais
request_stats.write.format('delta').mode('overwrite').save(DELTALAKE_SILVER_PATH)

# Registra a tabela SQL no banco de dados
spark.sql("CREATE TABLE if not exists silver USING DELTA LOCATION '" + DELTALAKE_SILVER_PATH + "'")

# Leitura da tabela
request_stats = spark.read.format("delta").load(DELTALAKE_SILVER_PATH)

display(request_stats)

                                                                                

DataFrame[id: int, nome_completo: string, email: string, valor_pedido: double, data_pedido: timestamp, tipo_cc: string, id_ip: int]

Atividade 2: Enriquecimento dos dados

In [14]:
#Abertura dos dados iniciais referentes aos ips
ip_path = "ips.csv"
ip_df = spark.read\
                .format("csv")\
                .option("inferSchema","true")\
                .option("header","true")\
                .load(ip_path)
ip_df.show(10)


+---+--------------+--------------------+
| id|            ip|          user_agent|
+---+--------------+--------------------+
|  1|126.65.243.231|Mozilla/5.0 (Wind...|
|  2| 181.81.197.29|Mozilla/5.0 (X11;...|
|  3|  225.79.10.95|Mozilla/5.0 (Wind...|
|  4| 138.177.78.33|Mozilla/5.0 (Maci...|
|  5|   2.85.122.24|Mozilla/5.0 (Maci...|
|  6| 188.90.71.147|Mozilla/5.0 (Wind...|
|  7| 75.104.245.95|Mozilla/5.0 (Wind...|
|  8|149.11.100.128|Mozilla/5.0 (Wind...|
|  9|201.138.216.30|Mozilla/5.0 (Wind...|
| 10| 206.28.109.87|Mozilla/5.0 (Wind...|
+---+--------------+--------------------+
only showing top 10 rows



In [15]:
#Função com base na API IPWHOIS para obtenção dos dados para enriquecimento da tabela
def ipwhois(ip):
    url = requests.get(f'https://ipwhois.app/json/{ip}')
    json = url.json()
    sucesso = json['success']
    ip = ip
    if sucesso == True:
        regiao = json['region']
        cidade = json['city']
        pais = json['country']
    else:
        regiao = ""
        cidade = ""
        pais = ""
    return ip, regiao, cidade, pais

#Iteração da coluna de ips
data_collect = ip_df.select("ip").rdd.flatMap(lambda x: x).collect()

#Inicialização de novo dataframe com dados provenientes da API IPWHOIS
rdd = [ipwhois(x) for x in data_collect]
newColumns = ["ip", "regiao","cidade","pais"]
new_ip_df = spark.createDataFrame(data=rdd, schema = newColumns)
new_ip_df.printSchema()
new_ip_df.show(truncate=False)


root
 |-- ip: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- pais: string (nullable = true)

+---------------+--------------------+-------------+-------------+
|ip             |regiao              |cidade       |pais         |
+---------------+--------------------+-------------+-------------+
|126.65.243.231 |Saitama Prefecture  |Yashio       |Japan        |
|181.81.197.29  |Santa Fe            |Rosario      |Argentina    |
|225.79.10.95   |                    |             |             |
|138.177.78.33  |Pennsylvania        |Whitehall    |United States|
|2.85.122.24    |Macedonia and Thrace|Kondariotissa|Greece       |
|188.90.71.147  |Gelderland          |Brakel       |Netherlands  |
|75.104.245.95  |New York            |New York     |United States|
|149.11.100.128 |Overijssel          |Oldenzaal    |Netherlands  |
|201.138.216.30 |Mexico City         |Mexico City  |Mexico       |
|206.28.109.87  |New York            |New Yo

In [16]:
#Join dos dataframes referentes a ip
IP_df = ip_df.join(new_ip_df, on=['ip'], how='inner').drop('ip')

#Novo parquet na Layer Silver referente a tabela de ips enriquecida
IP_df.select("*").write.format("parquet").save("ml/silver/ip_parquet")

In [17]:
IP_df.show(10)

+---+--------------------+--------------------+-------------+-------------+
| id|          user_agent|              regiao|       cidade|         pais|
+---+--------------------+--------------------+-------------+-------------+
|  1|Mozilla/5.0 (Wind...|  Saitama Prefecture|       Yashio|        Japan|
|  2|Mozilla/5.0 (X11;...|            Santa Fe|      Rosario|    Argentina|
|  3|Mozilla/5.0 (Wind...|                    |             |             |
|  4|Mozilla/5.0 (Maci...|        Pennsylvania|    Whitehall|United States|
|  5|Mozilla/5.0 (Maci...|Macedonia and Thrace|Kondariotissa|       Greece|
|  6|Mozilla/5.0 (Wind...|          Gelderland|       Brakel|  Netherlands|
|  7|Mozilla/5.0 (Wind...|            New York|     New York|United States|
|  8|Mozilla/5.0 (Wind...|          Overijssel|    Oldenzaal|  Netherlands|
|  9|Mozilla/5.0 (Wind...|         Mexico City|  Mexico City|       Mexico|
| 10|Mozilla/5.0 (Wind...|            New York|     New York|United States|
+---+-------

Atividade 3: Transformação dos dados

In [18]:

# Agregação de dados das duas tabelas (pedidos e ips) e remoção das colunas de pouca relevância
aggregated_df = request_stats.join(IP_df.withColumnRenamed("id", "id_ip"), on=['id_ip'], how='inner').drop("id_ip").drop("user_agent")

# Path da layer Gold
DELTALAKE_GOLD_PATH = "ml/gold"

# Inicializa a tabela parquet gold com os dados inciais
aggregated_df.write.format('delta').save(DELTALAKE_GOLD_PATH)

# Registra a tabela SQL no banco de dados
spark.sql(f"CREATE TABLE if not exists gold USING delta LOCATION '{DELTALAKE_GOLD_PATH}'")

display(aggregated_df)

                                                                                

DataFrame[id: int, nome_completo: string, email: string, valor_pedido: double, data_pedido: timestamp, tipo_cc: string, regiao: string, cidade: string, pais: string]

In [19]:
aggregated_df.show(10, vertical=True)

-RECORD 0-----------------------------
 id            | 1                    
 nome_completo | Aleksandr_Crighton   
 email         | acrighton0@instag... 
 valor_pedido  | 71.31                
 data_pedido   | 2021-08-21 14:18:35  
 tipo_cc       | jcb                  
 regiao        | Saitama Prefecture   
 cidade        | Yashio               
 pais          | Japan                
-RECORD 1-----------------------------
 id            | 2                    
 nome_completo | Tad_Arangy           
 email         | tarangy1@studiopr... 
 valor_pedido  | 28.12                
 data_pedido   | 2021-10-28 22:44:27  
 tipo_cc       | maestro              
 regiao        | Santa Fe             
 cidade        | Rosario              
 pais          | Argentina            
-RECORD 2-----------------------------
 id            | 3                    
 nome_completo | Wake_Samart          
 email         | wsamart2@ning.com    
 valor_pedido  | 148.18               
 data_pedido   | 2022-06-