# Feature Engineering - Tips (Vers√£o Spark)

**Convers√£o Pandas ‚Üí Spark**

Este notebook:
1. L√™ tips da camada Bronze (Spark)
2. Filtra apenas tips de business v√°lidos (Silver)
3. Cria features de usu√°rios (user_tip_count)
4. Cria features de business:
   - tip_count (contagem)
   - recency_score (baseado na data do √∫ltimo tip)
5. Aplica normaliza√ß√£o (log + MinMaxScaler)
6. Salva na Silver layer

**Vantagens vs Pandas:**
- ‚úÖ 10x mais r√°pido
- ‚úÖ Sem problemas de mem√≥ria
- ‚úÖ Acessa datalake diretamente

---

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, max as spark_max, log1p, 
    to_timestamp, current_timestamp, datediff, lit, broadcast
)
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F

In [2]:
# ‚ö° CONFIGURA√á√ÉO SPARK OTIMIZADA
spark = SparkSession.builder \
    .appName("Tip Feature Engineering") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print(f"‚úÖ Spark version: {spark.version}")

‚úÖ Spark version: 3.5.0


In [3]:
# Configura√ß√£o de Paths
BASE_PATH = '/home/jovyan/work'
DATA_PATH = f'{BASE_PATH}/data'
BRONZE_PATH = f'{DATA_PATH}/bronze'
SILVER_PATH = f'{DATA_PATH}/silver'

print(f"ü•â Bronze: {BRONZE_PATH}")
print(f"ü•à Silver: {SILVER_PATH}")

ü•â Bronze: /home/jovyan/work/data/bronze
ü•à Silver: /home/jovyan/work/data/silver


## 1. Carregar Dados do Datalake

In [4]:
print("\nüì• [1/5] Carregando dados do datalake...\n")

# 1. Tips (Bronze)
print("   üì¶ Carregando tips...")
df_tips = spark.read.parquet(f"{BRONZE_PATH}/tip")
count_tips_total = df_tips.count()
print(f"      ‚úÖ Tips: {count_tips_total:,} registros")

# 2. Business (Silver) - J√° filtrado
print("   üì¶ Carregando business...")
df_business = spark.read.parquet(f"{SILVER_PATH}/business")
count_business = df_business.count()
print(f"      ‚úÖ Business: {count_business:,} registros")

print("\n‚úÖ Dados carregados!")


üì• [1/5] Carregando dados do datalake...

   üì¶ Carregando tips...
      ‚úÖ Tips: 908,915 registros
   üì¶ Carregando business...
      ‚úÖ Business: 64,645 registros

‚úÖ Dados carregados!


In [6]:
# Verificar schema dos tips
print("üìã Schema do Tip:\n")
df_tips.printSchema()

print("\nüìä Amostra:")
df_tips.show(5, truncate=5)

üìã Schema do Tip:

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)


üìä Amostra:
+-----------+----------------+-----+-----+-------+
|business_id|compliment_count| date| text|user_id|
+-----------+----------------+-----+-----+-------+
|      Ea...|               0|20...|Ok...|  Wp...|
|      1_...|               0|20...|Av...|  SL...|
|      wE...|               0|20...|Al...|  mM...|
|      JL...|               0|20...|Th...|  Th...|
|      un...|               0|20...|Gr...|  xt...|
+-----------+----------------+-----+-----+-------+
only showing top 5 rows



## 2. Filtrar Tips por Business V√°lidos

In [7]:
print("\nüîß [2/5] Filtrando tips por business v√°lidos...\n")

# Broadcast join (business IDs √© pequeno)
valid_business_ids = df_business.select('business_id')

df_tips_filtered = df_tips.join(
    broadcast(valid_business_ids),
    on='business_id',
    how='inner'
)

# Persist - vai ser usado m√∫ltiplas vezes
df_tips_filtered.persist()

count_tips_filtered = df_tips_filtered.count()

print(f"   üìä Tips originais: {count_tips_total:,}")
print(f"   üìä Tips filtrados: {count_tips_filtered:,}")
print(f"   üìâ Redu√ß√£o: {100 * (1 - count_tips_filtered/count_tips_total):.1f}%")
print(f"\n   ‚úÖ Filtrado!")


üîß [2/5] Filtrando tips por business v√°lidos...

   üìä Tips originais: 908,915
   üìä Tips filtrados: 471,886
   üìâ Redu√ß√£o: 48.1%

   ‚úÖ Filtrado!


In [12]:
df_tips_filtered.show()

+--------------------+----------------+-------------------+--------------------+--------------------+
|         business_id|compliment_count|               date|                text|             user_id|
+--------------------+----------------+-------------------+--------------------+--------------------+
|EagkHaaC-kUozD3MP...|               0|2012-08-25 15:45:59|Okay totally get ...|WpCNsz24sa8WKPNjO...|
|wEdzUMaLE2ebYoe7Z...|               0|2015-07-29 03:44:09|Always epic :-) b...|mMbyHtXB0Spjs6HtB...|
|JLixvCikc5JYGcnva...|               0|2017-12-23 04:37:35|This place is ama...|ThxOrJ0-guAiHEk_H...|
|aUoMG97DMJG4nmwhT...|               0|2012-02-03 20:45:27|Lunch.... Jumbo l...|iTDRa5mQ0iW6RKifV...|
|fdmLML9X_0mWWG3TF...|               0|2010-11-12 00:07:10|Breakfast sampler...|RdGX12xRj4tYHSBrr...|
|YaEwp8emNzySe-d0M...|               0|2012-07-07 18:07:33|   Get the Jae Yook!|snDMqT-3QMWSP-ngv...|
|M-oi_kkI0AhbR-xqR...|               0|2014-05-24 01:08:13|The worst bbq I h...|rY

## 3. Features de Usu√°rios (User Side)

In [8]:
df_user_features = df_tips_filtered \
    .groupBy('user_id') \
    .agg(
        count('business_id').alias('user_tip_count')
    )

count_users = df_user_features.count()
print(f"   üìä Total de usu√°rios com tips: {count_users:,}")


print("\n   üìà Distribui√ß√£o de tips por usu√°rio:")
df_user_features.select('user_tip_count').summary().show()

print("   ‚úÖ Features de usu√°rios geradas!")


üë§ [3/5] Gerando features de usu√°rios...

   üìä Total de usu√°rios com tips: 186,363

   üìà Distribui√ß√£o de tips por usu√°rio:
+-------+------------------+
|summary|    user_tip_count|
+-------+------------------+
|  count|            186363|
|   mean|2.5320798656385657|
| stddev|  9.62150408823629|
|    min|                 1|
|    25%|                 1|
|    50%|                 1|
|    75%|                 2|
|    max|              1360|
+-------+------------------+

   ‚úÖ Features de usu√°rios geradas!


+--------------------+--------------+
|             user_id|user_tip_count|
+--------------------+--------------+
|8DYVfcMbskCpm3tZl...|             6|
|gIOJu-di8dTiSlRYc...|             7|
|kmFCBdDDwnszzPjSM...|             3|
|9mEStNO5uorGiECK6...|             1|
|yvMtlZnDm3cdLSdjx...|             2|
|4Oysk2ZYl52ID-7xs...|             1|
|eOPh_nXTW4l_vk9j8...|             1|
|Ha5bMb0SfRcvX7CS0...|             1|
|KnzMF1NoBRqtJULZ0...|             1|
|cOS-MNurk0vTQf2QL...|             3|
|8_dUX7813Xuk4z_pQ...|             1|
|esji5clHjFmS-tU6_...|             2|
|ZsHK3rZszbTUF_Fkw...|             1|
|c8otGYB12t1mnR-VR...|             1|
|1K6DwmdsX7NUgZmbh...|             1|
|48rgWyQ4TjIPLDavO...|             1|
|Wx7PqffTfBmSRneSo...|             2|
|V9VDc3h0VHyH3yVQd...|             1|
|tCmgAsZptUbj5IpbP...|             1|
|HVzcdZlpUkXWYshwF...|             1|
+--------------------+--------------+
only showing top 20 rows



In [9]:
# Salvar User Features
import os
import shutil

user_output_path = f'{SILVER_PATH}/tip_features_user'

# Remove se existir
if os.path.exists(user_output_path):
    shutil.rmtree(user_output_path)

df_user_features \
    .repartition(5) \
    .write \
    .mode('overwrite') \
    .option('compression', 'snappy') \
    .parquet(user_output_path)

print(f"\n   üíæ Salvo: {user_output_path}")
print(f"      Total: {count_users:,} usu√°rios")


   üíæ Salvo: /home/jovyan/work/data/silver/tip_features_user
      Total: 186,363 usu√°rios


## 4. Features de Business (Item Side)

In [13]:
print("\nüè™ [4/5] Gerando features de business...\n")

# Converter date para timestamp se necess√°rio
df_tips_with_date = df_tips_filtered.withColumn(
    'date_timestamp',
    to_timestamp(col('date'), 'yyyy-MM-dd HH:mm:ss')
)

# Agrega√ß√£o por business
df_business_features = df_tips_with_date \
    .groupBy('business_id') \
    .agg(
        count('user_id').alias('tip_count'),
        spark_max('date_timestamp').alias('last_tip_date')
    )

# Calcular dias desde o √∫ltimo tip
df_business_features = df_business_features.withColumn(
    'days_since_last_tip',
    datediff(current_timestamp(), col('last_tip_date'))
)

count_businesses = df_business_features.count()
print(f"   üìä Business com tips: {count_businesses:,}")

# Estat√≠sticas
print("\n   üìà Distribui√ß√£o de tips por business:")
df_business_features.select('tip_count', 'days_since_last_tip').summary().show()

print("   ‚úÖ Features de business geradas!")


üè™ [4/5] Gerando features de business...

   üìä Business com tips: 44,904

   üìà Distribui√ß√£o de tips por business:
+-------+------------------+-------------------+
|summary|         tip_count|days_since_last_tip|
+-------+------------------+-------------------+
|  count|             44904|              44904|
|   mean| 10.50877427400677|  2721.316742383752|
| stddev|28.974293196464423| 1000.1498461319642|
|    min|                 1|               1413|
|    25%|                 1|               1818|
|    50%|                 3|               2538|
|    75%|                 8|               3408|
|    max|              2571|               5986|
+-------+------------------+-------------------+

   ‚úÖ Features de business geradas!


## 5. Engenharia Matem√°tica (Log + Normaliza√ß√£o)

In [14]:
print("\nüß™ [5/5] Aplicando transforma√ß√µes matem√°ticas...\n")

# A. Log na Popularidade (tip_count)
print("   üìê Aplicando log1p no tip_count...")
df_features = df_business_features.withColumn(
    'tip_count_log',
    log1p(col('tip_count'))
)

# B. Recency Score (inverso dos dias)
# Score = 1 / (dias + 7)
# +7 para suavizar (evitar divis√£o por 0 e dar peso aos muito recentes)
print("   üìê Calculando recency score...")
df_features = df_features.withColumn(
    'recency_score',
    1.0 / (col('days_since_last_tip') + 7)
)

print("\n   ‚úÖ Transforma√ß√µes aplicadas!")


üß™ [5/5] Aplicando transforma√ß√µes matem√°ticas...

   üìê Aplicando log1p no tip_count...
   üìê Calculando recency score...

   ‚úÖ Transforma√ß√µes aplicadas!


In [15]:
print("\nüìä Aplicando MinMaxScaler...\n")

# Colunas para normalizar
cols_to_normalize = ['tip_count_log', 'recency_score']

# Aplicar MinMaxScaler em cada coluna
for col_name in cols_to_normalize:
    print(f"   ‚öôÔ∏è  Normalizando {col_name}...")
    
    # 1. Criar Vector
    assembler = VectorAssembler(
        inputCols=[col_name],
        outputCol=f"{col_name}_vec"
    )
    df_features = assembler.transform(df_features)
    
    # 2. Aplicar MinMaxScaler
    scaler = MinMaxScaler(
        inputCol=f"{col_name}_vec",
        outputCol=f"{col_name}_scaled"
    )
    scaler_model = scaler.fit(df_features)
    df_features = scaler_model.transform(df_features)
    
    # 3. Extrair valor do Vector
    from pyspark.sql.functions import udf
    
    def extract_value(vector):
        if vector is None:
            return 0.0
        return float(vector[0])
    
    extract_udf = udf(extract_value, DoubleType())
    
    df_features = df_features.withColumn(
        col_name,
        extract_udf(col(f"{col_name}_scaled"))
    )
    
    # 4. Dropar colunas tempor√°rias
    df_features = df_features.drop(f"{col_name}_vec", f"{col_name}_scaled")

print("\n   ‚úÖ MinMaxScaler aplicado!")


üìä Aplicando MinMaxScaler...

   ‚öôÔ∏è  Normalizando tip_count_log...
   ‚öôÔ∏è  Normalizando recency_score...

   ‚úÖ MinMaxScaler aplicado!


In [16]:
# Selecionar colunas finais
df_final = df_features.select(
    'business_id',
    'tip_count_log',
    'recency_score'
)

print("\nüìä Features finais:\n")
df_final.show(10)

# Estat√≠sticas das features normalizadas
print("\nüìà Estat√≠sticas das features normalizadas (devem estar entre 0-1):")
df_final.select('tip_count_log', 'recency_score').summary().show()


üìä Features finais:

+--------------------+-------------------+-------------------+
|         business_id|      tip_count_log|      recency_score|
+--------------------+-------------------+-------------------+
|PNby7mawC0ecfg-uE...| 0.2381168577713732| 0.9384320280843057|
|P2XJbQZmf1zvWp9L_...|0.39573932474489987| 0.9097675952579826|
|YdZS3QkpjgHU2zIJ_...| 0.3782567098971756| 0.6541971000475326|
|PQYuPr2Dfm2S49EUL...|  0.504368024193075| 0.6910618958048563|
|rQyJXOiZ39eRJ2l9O...| 0.3582685818052369| 0.8297611084670037|
|bWgi0dQSRbTa44YZS...| 0.2381168577713732| 0.6345990226601098|
|0wZJkj-OnZ7Pmubls...| 0.5491919710788076| 0.9844963045171038|
|BJOGo_upuBElDT_xO...|0.46035793900172334| 0.9375943686024595|
|2IahpaBR4U2Kdy9HF...| 0.5622555616202339|0.49126370027020144|
|-f5A6KKUu4jZN1qpt...|0.09681784033634044| 0.9460218677783597|
+--------------------+-------------------+-------------------+
only showing top 10 rows


üìà Estat√≠sticas das features normalizadas (devem estar entre 0-1

In [18]:
df_final.count()


44904

## 6. Salvar na Silver Layer

In [19]:
print("\nüíæ Salvando business features na Silver Layer...\n")

business_output_path = f'{SILVER_PATH}/tip_features_business'

# Remove se existir
if os.path.exists(business_output_path):
    shutil.rmtree(business_output_path)
    print(f"   üóëÔ∏è  Removido arquivo antigo: {business_output_path}")

# Salvar
df_final \
    .repartition(5) \
    .write \
    .mode('overwrite') \
    .option('compression', 'snappy') \
    .parquet(business_output_path)

print(f"\n{'='*60}")
print(f"‚úÖ BUSINESS FEATURES SALVAS: {business_output_path}")
print(f"   üìä Total: {count_businesses:,} business")
print(f"   üì¶ Parti√ß√µes: 5")
print(f"   üóúÔ∏è  Compress√£o: SNAPPY")
print(f"{'='*60}")


üíæ Salvando business features na Silver Layer...


‚úÖ BUSINESS FEATURES SALVAS: /home/jovyan/work/data/silver/tip_features_business
   üìä Total: 44,904 business
   üì¶ Parti√ß√µes: 5
   üóúÔ∏è  Compress√£o: SNAPPY


In [20]:
# Verificar arquivos salvos
print("\nüîç Verificando arquivos salvos...\n")

# User features
df_user_verify = spark.read.parquet(user_output_path)
print(f"‚úÖ User features: {df_user_verify.count():,} registros")
print("   Colunas:", df_user_verify.columns)

# Business features
df_business_verify = spark.read.parquet(business_output_path)
print(f"\n‚úÖ Business features: {df_business_verify.count():,} registros")
print("   Colunas:", df_business_verify.columns)

print("\nüìä Amostra do resultado final:")
df_business_verify.show(5)


üîç Verificando arquivos salvos...

‚úÖ User features: 186,363 registros
   Colunas: ['user_id', 'user_tip_count']

‚úÖ Business features: 44,904 registros
   Colunas: ['business_id', 'tip_count_log', 'recency_score']

üìä Amostra do resultado final:
+--------------------+-------------------+------------------+
|         business_id|      tip_count_log|     recency_score|
+--------------------+-------------------+------------------+
|u0wF_sqDWzJ3oVq1I...|0.17498420027487216|0.4317776833220361|
|T_pj13mZUV_2YirOF...|                0.0| 0.750448632484408|
|vOe2KAgnb9dA6Np66...|                0.0|0.6825105177459176|
|SG1GdORwoqxl_WEAJ...|                0.0|0.5038957136383712|
|_Q56kbKTbYW6dT2HC...| 0.2989214844085594|0.2501734936720353|
+--------------------+-------------------+------------------+
only showing top 5 rows



In [21]:
# Liberar mem√≥ria
print("\nüßπ Limpando cache...")
df_tips_filtered.unpersist()
spark.catalog.clearCache()
print("‚úÖ Cache limpo!")

print("\n" + "="*60)
print("üéâ PROCESSAMENTO COMPLETO!")
print("="*60)
print("\nüìÅ Arquivos gerados:")
print(f"   1. {user_output_path}")
print(f"   2. {business_output_path}")
print("\nüîÑ Pr√≥ximo passo:")
print("   Execute o notebook: 03_checkin_feature_engineering_SPARK.ipynb")
print("="*60)


üßπ Limpando cache...
‚úÖ Cache limpo!

üéâ PROCESSAMENTO COMPLETO!

üìÅ Arquivos gerados:
   1. /home/jovyan/work/data/silver/tip_features_user
   2. /home/jovyan/work/data/silver/tip_features_business

üîÑ Pr√≥ximo passo:
   Execute o notebook: 03_checkin_feature_engineering_SPARK.ipynb


---

## üìä Resumo do Pipeline

**Input:**
- `bronze/tip` - Tips brutos (~900K)
- `silver/business` - Business filtrados (~64K)

**Processamento:**
1. ‚úÖ Filtrar tips de business v√°lidos (~471K tips)
2. ‚úÖ Agregar por user_id ‚Üí user_tip_count
3. ‚úÖ Agregar por business_id:
   - tip_count (contagem)
   - last_tip_date (data mais recente)
   - days_since_last_tip (dias desde √∫ltimo)
4. ‚úÖ Transforma√ß√µes:
   - Log(tip_count)
   - Recency score = 1/(dias+7)
   - MinMaxScaler [0-1]

**Output:**
- `silver/tip_features_user` - Features por usu√°rio
- `silver/tip_features_business` - Features por business

**Features Finais (Business):**
- `business_id` - ID √∫nico
- `tip_count_log` - Popularidade normalizada [0-1]
- `recency_score` - Atividade recente normalizada [0-1]

**Performance:**
- Pandas: 5-10 minutos
- Spark: 1-2 minutos ‚ö°
- **Speedup: ~5-10x mais r√°pido!**

---