In [25]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_extract, regexp_replace, col, round, initcap, trim, hour, date_format, dayofweek, when
from pyspark.sql.types import IntegerType, BooleanType, StringType
import os

spark = SparkSession.builder.appName("pipeline").getOrCreate()

In [36]:
from pathlib import Path

BASE_DIR = Path.cwd().parent
DATA_DIR = BASE_DIR / "data"

# Tabela Cafes

In [38]:
cafes_path = DATA_DIR / "cafes.csv"
df_cafes = df = spark.read.option("header", True).csv(str(cafes_path))

df_cafes.show()


+---+---+--------------------+--------------------+--------------------+----------------+-----------+----------+--------+--------------+------+
|_c0| id|                name|             map_url|             img_url|        location|has_sockets|has_toilet|has_wifi|can_take_calls| seats|
+---+---+--------------------+--------------------+--------------------+----------------+-----------+----------+--------+--------------+------+
|  0|  1|Science Gallery L...|https://g.page/sc...|https://atlondonb...|   London Bridge|          1|         1|       0|             1|   50+|
|  1|  2|Social - Copeland...|https://g.page/Co...|https://images.sq...|         Peckham|          1|         1|       1|             0| 20-30|
|  2|  3|One & All Cafe Pe...|https://g.page/on...|https://lh3.googl...|         Peckham|          1|         1|       1|             0| 20-30|
|  3|  4|           Old Spike|https://www.googl...|https://lh3.googl...|         Peckham|          1|         0|       1|             0|

26/02/16 10:28:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , id, name, map_url, img_url, location, has_sockets, has_toilet, has_wifi, can_take_calls, seats
 Schema: _c0, id, name, map_url, img_url, location, has_sockets, has_toilet, has_wifi, can_take_calls, seats
Expected: _c0 but found: 
CSV file: file:///Users/nancydasilvatavaressantos/Meu%20Drive/backup/Faculdade/projects/cafe_analises/pipeline/data/cafes.csv


## `seats` column
Desmembrar em `min_seats` e `max_seats`

Analise exploratoria 

In [27]:
def seats_column(df: DataFrame) -> DataFrame:
    '''
    Desmembrar colunas `seats` em `min_seats` e `max_seats`
    '''
    
    # Normalizar
    df = df.withColumn(
        "seats",
        regexp_replace(col("seats"), r"\s*-\s*", "-")
    ).withColumn(
        "seats",
        regexp_replace(col("seats"), r"\+$", "-50")
    )

    # Cria colunas
    min_col = regexp_extract(col("seats"), r"^(\d+)", 1)
    max_col = regexp_extract(col("seats"), r"-(\d+)$", 1)

    df = df.withColumn("min_seats", min_col.cast(IntegerType()))
    df = df.withColumn(
        "max_seats",
        when(max_col != "", max_col.cast(IntegerType()))
        .otherwise(col("min_seats"))
    )
    df = df.withColumn(
        "avg_seats",
        ((col("min_seats") + col("max_seats")) / 2).cast(IntegerType())
    )

    return df

df_cafes = seats_column(df_cafes)
df_cafes.select("seats", "min_seats", "max_seats", "avg_seats").show()

+-----+---------+---------+---------+
|seats|min_seats|max_seats|avg_seats|
+-----+---------+---------+---------+
|50-50|       50|       50|       50|
|20-30|       20|       30|       25|
|20-30|       20|       30|       25|
| 0-10|        0|       10|        5|
|20-30|       20|       30|       25|
|50-50|       50|       50|       50|
|50-50|       50|       50|       50|
|10-20|       10|       20|       15|
|20-30|       20|       30|       25|
|20-30|       20|       30|       25|
|30-40|       30|       40|       35|
|20-30|       20|       30|       25|
|30-40|       30|       40|       35|
|50-50|       50|       50|       50|
|20-30|       20|       30|       25|
|10-20|       10|       20|       15|
|30-40|       30|       40|       35|
| 0-10|        0|       10|        5|
|40-50|       40|       50|       45|
|10-20|       10|       20|       15|
+-----+---------+---------+---------+
only showing top 20 rows


## Colunas binárias 
`has_sockets`, `has_toilets`, `has_wifi` e `can_take_calls`

In [28]:
def convert_columns_to_binary(df: DataFrame, columns_names: list[str]) -> DataFrame:
    '''
    Converte colunas para o tipo booleano
    '''

    existing_cols = set(df.columns)

    for c in columns_names:
        if c in existing_cols:
            df = df.withColumn(c, col(c).cast(BooleanType()))

    return df

cols_to_convert = ["has_sockets", "has_toilet", "has_wifi", "can_take_calls"]
df_cafes = convert_columns_to_binary(df_cafes, cols_to_convert)
df_cafes.select(cols_to_convert).printSchema()
df_cafes.select(cols_to_convert).show()

root
 |-- has_sockets: boolean (nullable = true)
 |-- has_toilet: boolean (nullable = true)
 |-- has_wifi: boolean (nullable = true)
 |-- can_take_calls: boolean (nullable = true)

+-----------+----------+--------+--------------+
|has_sockets|has_toilet|has_wifi|can_take_calls|
+-----------+----------+--------+--------------+
|       true|      true|   false|          true|
|       true|      true|    true|         false|
|       true|      true|    true|         false|
|       true|     false|    true|         false|
|       true|      true|    true|         false|
|       true|      true|    true|         false|
|       true|      true|    true|         false|
|       true|      true|    true|         false|
|       true|      true|    true|          true|
|      false|      true|    true|         false|
|      false|      true|    true|          true|
|       true|      true|    true|          true|
|       true|      true|    true|         false|
|      false|      true|    true|  

## `location`

Análise Exploratória

In [31]:
def trim_text_columns(df: DataFrame, columns_names: list[str]) -> DataFrame:
    '''
    Remove espaços no início e fim de colunas string
    '''
    
    existing_cols = set(df.columns)
    
    for c in columns_names:
        if c in existing_cols:
            df = df.withColumn(c, trim(col(c)))
    
    return df

cols_to_trim = ["name", "location"]
df_cafes = trim_text_columns(df_cafes, cols_to_trim)
df_cafes.select(cols_to_trim).show()

+--------------------+----------------+
|                name|        location|
+--------------------+----------------+
|Science Gallery L...|   London Bridge|
|Social - Copeland...|         Peckham|
|One & All Cafe Pe...|         Peckham|
|           Old Spike|         Peckham|
|Fuckoffee Bermondsey|      Bermondsey|
|  Mare Street Market|         Hackney|
|Ace Hotel Shoreditch|      Shoreditch|
| Goswell Road Coffee|     Clerkenwell|
|The Southwark Cat...|   London Bridge|
|Trade Commercial ...|     Whitechapel|
|The Tate Modern Cafe|        Bankside|
|         Forage Cafe|     Clerkenwell|
|Citizen M Hotel S...|      Shoreditch|
|     Barbican Centre|        Barbican|
|The Slaughtered Lamb|     Clerkenwell|
|Fernandez and Wel...|South Kensington|
|   Whitechapel Grind|     Whitechapel|
| The Peckham Pelican|         Peckham|
|Natural History M...|South Kensington|
|       The Bike Shed|      Shoreditch|
+--------------------+----------------+
only showing top 20 rows


## `establishment_type`

In [33]:
def establishment_type_column(df: DataFrame) -> DataFrame:
    '''
    criar coluna categorica establishment_type
    '''
    
    df = df.withColumn(
        "establishment_type",
        when(col("avg_seats") < 10, "Coffee Stand / To Go")
        .when(
            col("has_wifi") & col("has_sockets") & (col("avg_seats") > 40),
            "Co-working Friendly"
        )
        .when(col("avg_seats") > 40, "Large Cafe")
        .otherwise("Standard Cafe")
    )

    return df

df_cafes = establishment_type_column(df_cafes)
df_cafes.select("establishment_type").show()

+--------------------+
|  establishment_type|
+--------------------+
|          Large Cafe|
|       Standard Cafe|
|       Standard Cafe|
|Coffee Stand / To Go|
|       Standard Cafe|
| Co-working Friendly|
| Co-working Friendly|
|       Standard Cafe|
|       Standard Cafe|
|       Standard Cafe|
|       Standard Cafe|
|       Standard Cafe|
|       Standard Cafe|
|          Large Cafe|
|       Standard Cafe|
|       Standard Cafe|
|       Standard Cafe|
|Coffee Stand / To Go|
| Co-working Friendly|
|       Standard Cafe|
+--------------------+
only showing top 20 rows


# Tabela Menu item 

In [39]:
menu_items_path = DATA_DIR / "menu_items.csv"

df_menu_items = spark.read.option("header", True).csv(str(menu_items_path))
df_menu_items.show()

+---+----------------+-------------+--------+-----+----+------------+--------+--------------+----------+
| id|establishment_id|         name|category|price|cost|is_available|is_vegan|is_gluten_free|created_at|
+---+----------------+-------------+--------+-----+----+------------+--------+--------------+----------+
|  1|               1|     Espresso|  coffee|  6.5| 2.1|        True|   False|          True|2024-01-01|
|  2|               1|   Cappuccino|  coffee|  8.0| 2.8|        True|   False|          True|2024-01-01|
|  3|               1|        Latte|  coffee|  9.0| 3.0|        True|   False|          True|2024-01-01|
|  4|               1|        Mocha|  coffee| 10.0| 3.5|        True|   False|          True|2024-01-01|
|  5|               1|    Croissant|    food|  9.5| 4.0|        True|   False|         False|2024-01-01|
|  6|               1|      Brownie|    food|  7.0| 2.5|        True|   False|         False|2024-01-01|
|  7|               2|     Espresso|  coffee|  6.0| 2.0

In [41]:
df_menu_items.select("category").distinct().show()

+--------+
|category|
+--------+
|beverage|
|    food|
| dessert|
|     tea|
|  coffee|
+--------+



### Colunas de texto 
`name` e `category`

In [42]:
cols_to_trim = ["name", "category"]

df_menu_items = trim_text_columns(df_menu_items, cols_to_trim)
df_menu_items.select("name", "category").show()

+-------------+--------+
|         name|category|
+-------------+--------+
|     Espresso|  coffee|
|   Cappuccino|  coffee|
|        Latte|  coffee|
|        Mocha|  coffee|
|    Croissant|    food|
|      Brownie|    food|
|     Espresso|  coffee|
|   Cappuccino|  coffee|
|        Latte|  coffee|
| Matcha Latte|     tea|
|Pão de Queijo|    food|
|   Cheesecake| dessert|
|   Flat White|  coffee|
|    Americano|  coffee|
|   Iced Latte|  coffee|
|       Muffin|    food|
|Avocado Toast|    food|
|     Espresso|  coffee|
|      Cortado|  coffee|
|    Cold Brew|  coffee|
+-------------+--------+
only showing top 20 rows


## Métricas de Lucratividade

In [45]:
df_menu_items.printSchema()

root
 |-- id: string (nullable = true)
 |-- establishment_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: string (nullable = true)
 |-- cost: string (nullable = true)
 |-- is_available: string (nullable = true)
 |-- is_vegan: string (nullable = true)
 |-- is_gluten_free: string (nullable = true)
 |-- created_at: string (nullable = true)



In [48]:
def profit_columns(df: DataFrame) -> DataFrame:
    '''
    calcular gross margin e margin percentage
    '''

    df = df.withColumn("price", col("price").cast("double")).withColumn("cost", col("cost").cast("double"))
    
    df = df.withColumn(
        "gross_margin",
        round(col("price") - col("cost"), 2)  # Margem Bruta
    ).withColumn(
        "margin_percentage",
        when(
            col("price") != 0,
            round(((col("price") - col("cost")) / col("price")) * 100, 2)
        ).otherwise(None)  # % Margem
    )

    return df

df_menu_items = profit_columns(df_menu_items)

df_menu_items.select("price", "cost", "gross_margin", "margin_percentage").printSchema()
df_menu_items.select("price", "cost", "gross_margin", "margin_percentage").show()


root
 |-- price: double (nullable = true)
 |-- cost: double (nullable = true)
 |-- gross_margin: double (nullable = true)
 |-- margin_percentage: double (nullable = true)

+-----+----+------------+-----------------+
|price|cost|gross_margin|margin_percentage|
+-----+----+------------+-----------------+
|  6.5| 2.1|         4.4|            67.69|
|  8.0| 2.8|         5.2|             65.0|
|  9.0| 3.0|         6.0|            66.67|
| 10.0| 3.5|         6.5|             65.0|
|  9.5| 4.0|         5.5|            57.89|
|  7.0| 2.5|         4.5|            64.29|
|  6.0| 2.0|         4.0|            66.67|
|  8.5| 3.0|         5.5|            64.71|
|  9.5| 3.2|         6.3|            66.32|
| 11.0| 3.8|         7.2|            65.45|
|  6.0| 2.5|         3.5|            58.33|
| 12.0| 5.0|         7.0|            58.33|
|  7.5| 2.5|         5.0|            66.67|
|  6.0| 2.0|         4.0|            66.67|
|  9.5| 3.2|         6.3|            66.32|
|  8.0| 3.5|         4.5|           

## Faixa de Preço

In [49]:
def price_range_column(df: DataFrame) -> DataFrame:
    df = df.withColumn(
        "price_category",
        when(col("price") < 10, "Budget")
        .when((col("price") >= 10) & (col("price") <= 20), "Standard")
        .otherwise("Premium")
    )

    return df

df_menu_items = price_range_column(df_menu_items)
df_menu_items.select("name", "price", "price_category").show() 

+-------------+-----+--------------+
|         name|price|price_category|
+-------------+-----+--------------+
|     Espresso|  6.5|        Budget|
|   Cappuccino|  8.0|        Budget|
|        Latte|  9.0|        Budget|
|        Mocha| 10.0|      Standard|
|    Croissant|  9.5|        Budget|
|      Brownie|  7.0|        Budget|
|     Espresso|  6.0|        Budget|
|   Cappuccino|  8.5|        Budget|
|        Latte|  9.5|        Budget|
| Matcha Latte| 11.0|      Standard|
|Pão de Queijo|  6.0|        Budget|
|   Cheesecake| 12.0|      Standard|
|   Flat White|  7.5|        Budget|
|    Americano|  6.0|        Budget|
|   Iced Latte|  9.5|        Budget|
|       Muffin|  8.0|        Budget|
|Avocado Toast| 12.5|      Standard|
|     Espresso|  5.5|        Budget|
|      Cortado|  7.0|        Budget|
|    Cold Brew|  8.5|        Budget|
+-------------+-----+--------------+
only showing top 20 rows


In [0]:
df_menu_items.display()

id,establishment_id,name,category,price,cost,is_available,is_vegan,is_gluten_free,created_at,gross_margin,margin_percentage,price_category
1,1,Espresso,Coffee,6.5,2.1,True,False,True,2024-01-01,4.4,67.69,Budget
2,1,Cappuccino,Coffee,8.0,2.8,True,False,True,2024-01-01,5.2,65.0,Budget
3,1,Latte,Coffee,9.0,3.0,True,False,True,2024-01-01,6.0,66.67,Budget
4,1,Mocha,Coffee,10.0,3.5,True,False,True,2024-01-01,6.5,65.0,Standard
5,1,Croissant,Food,9.5,4.0,True,False,False,2024-01-01,5.5,57.89,Budget
6,1,Brownie,Food,7.0,2.5,True,False,False,2024-01-01,4.5,64.29,Budget
7,2,Espresso,Coffee,6.0,2.0,True,False,True,2024-01-02,4.0,66.67,Budget
8,2,Cappuccino,Coffee,8.5,3.0,True,False,True,2024-01-02,5.5,64.71,Budget
9,2,Latte,Coffee,9.5,3.2,True,False,True,2024-01-02,6.3,66.32,Budget
10,2,Matcha Latte,Tea,11.0,3.8,True,True,True,2024-01-02,7.2,65.45,Standard


# Tabela  sales orders items 

In [54]:
orders_items_path = DATA_DIR / "orders_items_data.json"

df_orders_items = spark.read.option("multiline", "true").json(str(orders_items_path))

df_orders_items.printSchema()
df_orders_items.show()

root
 |-- id: long (nullable = true)
 |-- menu_item_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: double (nullable = true)

+---+------------+--------+--------+----------+
| id|menu_item_id|order_id|quantity|unit_price|
+---+------------+--------+--------+----------+
| 61|          31|    1032|       3|       7.0|
| 62|          33|    1032|       3|      10.0|
| 63|          35|    1032|       2|      13.5|
| 64|          34|    1032|       2|      14.0|
+---+------------+--------+--------+----------+



## total_price

In [56]:
def total_price_column(df: DataFrame) -> DataFrame:
    df = df.withColumn(
        "total_price",
        round(col("quantity") * col("unit_price"), 2)  
    )

    return df

df_orders_items = total_price_column(df_orders_items)
df_orders_items.select("quantity", "unit_price", "total_price").show()

+--------+----------+-----------+
|quantity|unit_price|total_price|
+--------+----------+-----------+
|       3|       7.0|       21.0|
|       3|      10.0|       30.0|
|       2|      13.5|       27.0|
|       2|      14.0|       28.0|
+--------+----------+-----------+



# Tabela sales_orders

In [57]:
orders_path = DATA_DIR / "orders_data.json"

df_orders = spark.read.option("multiline", "true").json(str(orders_path))

df_orders.printSchema()
df_orders.show()


root
 |-- created_at: string (nullable = true)
 |-- establishment_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- total_amount: double (nullable = true)

+----------------+----------------+----+--------------+------------+
|      created_at|establishment_id|  id|payment_method|total_amount|
+----------------+----------------+----+--------------+------------+
|2024-02-14 15:15|               7|1032|           pix|       106.0|
+----------------+----------------+----+--------------+------------+



## Padronizacão Coluna de texto 
`payment_method`

In [59]:
cols_to_trim = ["payment_method"]
df_orders = trim_text_columns(df_orders, columns_names=cols_to_trim)

df_orders.select("payment_method").show()


+--------------+
|payment_method|
+--------------+
|           pix|
+--------------+



## Coluna de Timestamp

In [None]:
def timestamp_column(df: DataFrame) -> DataFrame:

    df = df.withColumn(
        "order_date",
        date_format(col("created_at"), "yyyy-MM-dd")  # Apenas a data
    ).withColumn(
        "order_hour",
        hour(col("created_at"))  # Hora 0-23
    ).withColumn(
        "day_of_week",
        date_format(col("created_at"), "EEEE")  # Segunda, Terça...
    ).withColumn(
        "month_name",
        date_format(col("created_at"), "MMMM")  # Janeiro, Fevereiro...
    )

    df = df.withColumn(
        "day_period",
        when((col("order_hour") >= 6) & (col("order_hour") <= 11), "Manhã")
        .when((col("order_hour") >= 12) & (col("order_hour") <= 14), "Almoço")
        .when((col("order_hour") >= 15) & (col("order_hour") <= 18), "Tarde")
        .when(col("order_hour") >= 19, "Noite")
        .otherwise("Fora do horário")  # cobre 0-5
    )

    return df

df_orders = timestamp_column(df_orders)
df_orders.select(
    "created_at", "order_date", "order_hour", "day_of_week", "month_name", "day_period"
).show()

+----------------+----------+----------+-----------+----------+----------+
|      created_at|order_date|order_hour|day_of_week|month_name|day_period|
+----------------+----------+----------+-----------+----------+----------+
|2024-02-14 15:15|2024-02-14|        15|  Wednesday|  February|     Tarde|
+----------------+----------+----------+-----------+----------+----------+



26/02/16 13:55:30 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 762660 ms exceeds timeout 120000 ms
26/02/16 13:55:30 WARN SparkContext: Killing executors is not supported by current scheduler.
26/02/16 13:55:32 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at o

# Carregar `dfs` na proxima camada

In [0]:
dfs_list = [
    (df_cafes, "cafes"),
    (df_menu_items, "menu_items"),
    (df_sales_orders, "sales_orders"),
    (df_sales_orders_items, "sales_orders_items")
]

base_path = "/Volumes/workspace/cafes_db/transformed_data"

for df, name in dfs_list:
    output_path = f"{base_path}/{name}"
    df.write.mode("overwrite").parquet(output_path)
    print(f"Salvo {name} em {output_path}")

Salvo cafes em /Volumes/workspace/cafes_db/transformed_data/cafes
Salvo menu_items em /Volumes/workspace/cafes_db/transformed_data/menu_items
Salvo sales_orders em /Volumes/workspace/cafes_db/transformed_data/sales_orders
Salvo sales_orders_items em /Volumes/workspace/cafes_db/transformed_data/sales_orders_items
