# ⚽ FASE 1: Download EPL Dataset → Delta Lake

**Sin usar DBFS (Community Edition compatible)**

---

## 1. Descargar y Descomprimir

In [0]:
%sh
cd /tmp
curl -L -o epl_dataset.zip \
  https://www.kaggle.com/api/v1/datasets/download/mohamadsallah5/english-premier-league-stats20212024

unzip -o epl_dataset.zip

echo "Files downloaded:"
ls -lh *.csv | head -10

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 60495  100 60495    0     0  85889      0 --:--:-- --:--:-- --:--:-- 85889


Archive:  epl_dataset.zip
  inflating: mydata.csv              
Files downloaded:
-rw-r--r-- 1 spark-7dada2e1-f834-4439-9100-53 spark-7dada2e1-f834-4439-9100-53 243K Aug 18 14:27 mydata.csv


## 2. Listar CSVs disponibles

In [0]:
import os
import glob

# Listar CSVs en /tmp
csv_files = glob.glob('/tmp/*.csv')

print("=" * 60)
print(f"📊 CSVs encontrados: {len(csv_files)}")
print("=" * 60)

for csv_file in csv_files[:10]:  # Primeros 10
    filename = os.path.basename(csv_file)
    size_mb = os.path.getsize(csv_file) / (1024 * 1024)
    print(f"  - {filename} ({size_mb:.2f} MB)")

if len(csv_files) > 10:
    print(f"  ... y {len(csv_files) - 10} más")

print("=" * 60)

📊 CSVs encontrados: 1
  - mydata.csv (0.24 MB)


## 3. Cargar CSVs con Pandas → Spark

**Sin usar DBFS, directo desde /tmp**

In [0]:
import pandas as pd
from pyspark.sql import SparkSession

print("📊 Cargando datos...")
print("=" * 60)

dataframes = {}

for csv_file in csv_files:
    filename = os.path.basename(csv_file)
    table_name = filename.replace('.csv', '').lower()
    table_name = table_name.replace('-', '_').replace(' ', '_').replace('.', '_')

    try:
        # Leer con pandas desde /tmp (permitido en Community Edition)
        print(f"\n📖 {filename}")
        pdf = pd.read_csv(csv_file)

        # Convertir a Spark DataFrame
        df = spark.createDataFrame(pdf)

        count = len(pdf)
        cols = len(pdf.columns)

        dataframes[table_name] = df
        print(f"   ✅ {count:,} registros, {cols} columnas")

    except Exception as e:
        print(f"   ❌ Error: {str(e)[:100]}")

print("\n" + "=" * 60)
print(f"✅ Total: {len(dataframes)} DataFrames cargados")
print("=" * 60)

📊 Cargando datos...

📖 mydata.csv
   ✅ 1,140 registros, 40 columnas

✅ Total: 1 DataFrames cargados


## 4. Preview de datos

In [0]:
if dataframes:
    first_name = list(dataframes.keys())[0]
    first_df = dataframes[first_name]

    print(f"📋 Preview: {first_name}")
    print(f"   Registros: {first_df.count():,}")
    print(f"   Columnas: {len(first_df.columns)}")
    print()

    display(first_df.limit(5))
else:
    print("⚠️ No hay datos cargados")

📋 Preview: mydata
   Registros: 1,140
   Columnas: 40



date,clock,stadium,class,attendance,Home Team,Goals Home,Away Team,Away Goals,home_possessions,away_possessions,home_shots,away_shots,home_on,away_on,home_off,away_off,home_blocked,away_blocked,home_pass,away_pass,home_chances,away_chances,home_corners,away_corners,home_offside,away_offside,home_tackles,away_tackles,home_duels,away_duels,home_saves,away_saves,home_fouls,away_fouls,home_yellow,away_yellow,home_red,away_red,links
28th May 2023,4:30pm,Emirates Stadium,h,60095,2,5,13,0,51.0,49.0,14,6,8,0,4,4,2,2,89.0,88.0,3,0,8,4,1,0,82.4,44.4,47.8,52.2,0,3,8,11,0,0,0,0,https://www.skysports.com/football/arsenal-vs-wolverhampton-wanderers/465005
28th May 2023,4:30pm,Villa Park,h,42212,7,2,6,1,40.3,59.7,12,8,5,4,5,3,2,1,75.3,83.6,4,3,4,3,0,6,42.9,15.4,52.2,47.8,3,3,15,16,4,4,0,0,https://www.skysports.com/football/aston-villa-vs-brighton-and-hove-albion/465006
28th May 2023,4:30pm,Gtech Community Stadium,h,17120,9,1,1,0,34.4,65.6,11,17,4,3,4,6,3,8,79.3,89.8,2,1,3,4,3,0,64.7,35.7,50.0,50.0,2,3,12,8,4,0,0,0,https://www.skysports.com/football/brentford-vs-manchester-city/465007
28th May 2023,4:30pm,Stamford Bridge,d,40130,12,1,4,1,64.4,35.6,22,13,5,4,9,5,8,4,88.9,83.3,2,2,10,3,2,1,42.9,42.9,54.5,45.5,3,5,9,11,0,0,0,0,https://www.skysports.com/football/chelsea-vs-newcastle-united/465008
28th May 2023,4:30pm,Selhurst Park,d,25198,11,1,16,1,66.0,34.0,15,7,3,4,8,2,4,1,85.7,69.9,1,0,5,4,2,2,40.0,52.6,58.3,41.7,3,2,9,13,0,2,0,0,https://www.skysports.com/football/crystal-palace-vs-nottingham-forest/465009


## 5. Guardar en Delta Lake

In [0]:
  import re

  # Asegurarse de que la función existe
  def clean_column_name(col_name):
      """Limpia nombre de columna para Delta Lake"""
      col_name = str(col_name).lower()
      col_name = re.sub(r'[^a-z0-9_]', '_', col_name)
      col_name = re.sub(r'_+', '_', col_name)
      col_name = col_name.strip('_')
      return col_name

  if not dataframes:
      print("❌ No hay datos para guardar")
      dbutils.notebook.exit("No data")

  print("=" * 60)
  print("💾 GUARDANDO EN DELTA LAKE")
  print("=" * 60)

  for table_name, df in dataframes.items():
      delta_name = f"football_{table_name}_raw"

      print(f"\n📝 Procesando: {delta_name}")

      # Mostrar columnas originales (primeras 5)
      orig_cols = df.columns[:5]
      print(f"   Columnas originales: {orig_cols}")

      # Limpiar nombres de columnas
      clean_cols = [clean_column_name(col) for col in df.columns]
      print(f"   Columnas limpias: {clean_cols[:5]}")

      # Crear DataFrame con nombres limpios
      df_clean = df.toDF(*clean_cols)

      try:
          df_clean.write.format("delta").mode("overwrite").saveAsTable(delta_name)
          count = spark.table(delta_name).count()
          print(f"   ✅ {count:,} registros guardados")
      except Exception as e:
          print(f"   ❌ Error: {str(e)[:200]}")

  print("\n" + "=" * 60)
  print("🎉 FASE 1 COMPLETADA")
  print("=" * 60)
    

💾 GUARDANDO EN DELTA LAKE

📝 Procesando: football_mydata_raw
   Columnas originales: ['date', 'clock', 'stadium', 'class', 'attendance']
   Columnas limpias: ['date', 'clock', 'stadium', 'class', 'attendance']
   ✅ 1,140 registros guardados

🎉 FASE 1 COMPLETADA


## 6. Verificar Delta Tables

In [0]:
from pyspark.sql.functions import col

tables = spark.sql("SHOW TABLES").filter(col("tableName").like("football_%")).collect()

print("=" * 60)
print(f"📊 DELTA TABLES: {len(tables)}")
print("=" * 60)

for table in tables:
    name = table['tableName']
    count = spark.table(name).count()
    cols = len(spark.table(name).columns)
    print(f"  - {name}: {count:,} registros, {cols} columnas")

print("\n" + "=" * 60)
print("✅ Listo para FASE 2: Feature Engineering")
print("=" * 60)

📊 DELTA TABLES: 7
  - football_matches_clean: 1,140 registros, 44 columnas
  - football_mydata_raw: 1,140 registros, 40 columnas
  - football_team_form: 2,280 registros, 20 columnas
  - football_team_matches: 2,280 registros, 15 columnas
  - football_team_names: 25 registros, 2 columnas
  - football_team_stats: 25 registros, 17 columnas
  - football_team_venue_stats: 25 registros, 11 columnas

✅ Listo para FASE 2: Feature Engineering


## Resumen

**Completado:**
- ✅ Descarga de dataset EPL
- ✅ Carga de CSVs (sin DBFS)
- ✅ Guardado en Delta Lake

**Próximo paso:**
- FASE 2: Spark Pipeline + Feature Engineering