In [44]:
# Utilitarios para modificar el esquema de metadatos
from pyspark.sql.types import StructType, StructField

#Importamos los tipos de datos que definiremos para cada campo
from pyspark.sql.types import StringType, IntegerType, DoubleType

#Importamos la librerIa de pandas compatible con entornos de clUster de Big Data
import pyspark.pandas as pd

#Por defecto un dataframe Pandas muestra 1000 registros
#Vamos a indicarle que solo muestre 20 para que no se sature el notebook
pd.set_option("display.max_rows", 20)

#Libreria para manipular los servicios de AWS
import boto3

#Libreria utilitaria para JSON
import json

Calculation started (calculation_id=9cc921cb-a895-431f-4fd9-823b42b98bd9) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [45]:
#Nombre del bucket desde donde se lee el archivo
#IMPORTANTE: REEMPLAZAR "XXX" POR TUS INICIALES
bucket = "datasetsbdajac"

#Ruta dentro del archivo dentro del bucket
rutaDeArchivo = "data/bodyperformance"

#Definicion del esquema de metadatos
schema = StructType(
    [
        StructField("age", DoubleType(), True),
        StructField("gender", StringType(), True),
        StructField("height_cm", DoubleType(), True),
        StructField("weight_kg", DoubleType(), True),
        StructField("body fat_%", DoubleType(), True),
        StructField("diastolic", DoubleType(), True),
        StructField("systolic", DoubleType(), True),
        StructField("gripForce", DoubleType(), True),
        StructField("sit and bend forward_cm", DoubleType(), True),
        StructField("sit-ups counts", DoubleType(), True),
        StructField("broad jump_cm", DoubleType(), True),
        StructField("class", StringType(), True)
    ]
)

#Definimos las variables categoricas
categorias = [
  "gender"
]

#Campo label
label = "class"

Calculation started (calculation_id=d0c921cb-b2b2-97ce-ab8f-fd6326aec333) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [46]:
#Definimos la ruta desde donde se lee el archivo de datos que se preparara
#En Python 3, podemos anteponer "f" en una cadena para reemplazar valores
rutaArchivoRaw = f"s3://{bucket}/{rutaDeArchivo}/"

#Verificamos
print(rutaArchivoRaw)



Calculation started (calculation_id=84c921cb-b7fd-b770-5954-6fd9ba286e46) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
s3://datasetsbdajac/data/bodyperformance/



In [47]:
#Leemos los datos
dfRaw = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("encoding", "ISO-8859-1").schema(schema).load(rutaArchivoRaw)

#Vemos el esquema de metadatos
dfRaw.printSchema()

#Verificamos
dfRaw.show()

Calculation started (calculation_id=28c921cb-c753-e433-77c4-75f7b1cc860f) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
root
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- height_cm: double (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- body fat_%: double (nullable = true)
 |-- diastolic: double (nullable = true)
 |-- systolic: double (nullable = true)
 |-- gripForce: double (nullable = true)
 |-- sit and bend forward_cm: double (nullable = true)
 |-- sit-ups counts: double (nullable = true)
 |-- broad jump_cm: double (nullable = true)
 |-- class: string (nullable = true)

+----+------+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+-----+
| age|gender|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|class|
+----+------+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+-----+
|27.0|     M|    172.3|    75.24|      21.3|     80.0|   130.0|     54

In [48]:
#Convertimos el dataframe SPARK a un dataframe PANDAS
dfpRaw = pd.from_pandas(dfRaw.toPandas())

#Verificamos
dfpRaw

Calculation started (calculation_id=c6c921cb-e13f-5209-fa87-6e499f56ab50) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
     age gender  height_cm  weight_kg  body fat_%  diastolic  systolic  gripForce  sit and bend forward_cm  sit-ups counts  broad jump_cm class
0   27.0      M      172.3      75.24        21.3       80.0     130.0       54.9                     18.4            60.0          217.0     C
1   25.0      M      165.0      55.80        15.7       77.0     126.0       36.4                     16.3            53.0          229.0     A
2   31.0      M      179.6      78.00        20.1       92.0     152.0       44.8                     12.0            49.0          181.0     C
3   32.0      M      174.5      71.10        18.4       76.0     147.0       41.4                     15.2            53.0          219.0     B
4   28.0      M      173.8      67.70        17.1       70.0     127.0       43.5                     27.1            45.0          217.0     B
5   36.0      F      165.4      55.40        22.0       64.0     119.0       23.8                     21.0       

In [49]:
#Aqui guardaremos los valores unicos de cada categoria
valoresUnicosDeCadaCategoria = {}

#Iteramos cada categoria
for categoria in categorias:
    #Agregamos los valores unicos
    valoresUnicosDeCadaCategoria[categoria] = sorted(dfpRaw[categoria].unique().tolist())

#Verificamos
print(valoresUnicosDeCadaCategoria)

Calculation started (calculation_id=3cc921cb-fe0a-c6ef-333d-498f7430fef8) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
{'gender': ['F', 'M']}



In [50]:
#Agregamos tambien los valores categericos unicos del label
valoresUnicosDeCadaCategoria[label] = sorted(dfpRaw[label].unique().tolist())

#Verificamos
print(valoresUnicosDeCadaCategoria)

Calculation started (calculation_id=8cc921cc-167a-a607-e673-8900fbc9711e) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
{'gender': ['F', 'M'], 'class': ['A', 'B', 'C', 'D']}



In [51]:
#Conversion de variables categoricas a variables numericas en columnas dummy
dfpDataset = pd.get_dummies(dfpRaw, columns = categorias)

#Verificamos
dfpDataset

Calculation started (calculation_id=22c921cc-2813-18dc-d1b4-e504e690380c) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
     age  height_cm  weight_kg  body fat_%  diastolic  systolic  gripForce  sit and bend forward_cm  sit-ups counts  broad jump_cm class  gender_F  gender_M
0   27.0      172.3      75.24        21.3       80.0     130.0       54.9                     18.4            60.0          217.0     C         0         1
1   25.0      165.0      55.80        15.7       77.0     126.0       36.4                     16.3            53.0          229.0     A         0         1
2   31.0      179.6      78.00        20.1       92.0     152.0       44.8                     12.0            49.0          181.0     C         0         1
3   32.0      174.5      71.10        18.4       76.0     147.0       41.4                     15.2            53.0          219.0     B         0         1
4   28.0      173.8      67.70        17.1       70.0     127.0       43.5                     27.1            45.0          217.0     B         0         1
5   36.0      165.4      55.40     

In [52]:
#En esta variable tenemos los valores categoricos de nuestro label
valoresUnicosDeCadaCategoria[label]

Calculation started (calculation_id=42c921cc-393e-9422-b697-9b4bb4b6342a) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
['A', 'B', 'C', 'D']



In [53]:
#Tendremos que darle esta estructura
#{"A": 0, "B": 1, "C": 2, "D": 3}
mapeoDeCategoriasLabel = {}

Calculation started (calculation_id=c4c921cc-3a25-c040-29b7-a26ceb75cafb) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [54]:
#Indice inicial
i = 0

Calculation started (calculation_id=70c921cc-3b23-0f4b-6ff7-b06ad140a84c) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [55]:
#Iteramos cada valor categorico del label
for valorCategorico in valoresUnicosDeCadaCategoria[label]:
    #Colocamos el valor numerico con el que se reemplazara
    mapeoDeCategoriasLabel[valorCategorico] = i

    #Aumentamos el indice en 1 para el siguiente valor categorico
    i = i + 1

Calculation started (calculation_id=88c921cc-3c1c-2e45-1243-e400e4854039) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [56]:
#Verificamos
mapeoDeCategoriasLabel

Calculation started (calculation_id=72c921cc-3d2b-61a0-096f-68a2e504ed4b) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
{'A': 0, 'B': 1, 'C': 2, 'D': 3}



In [57]:
#Reemplazamos la columna label por las nuevas etiquetas
dfpDataset[label] = dfpDataset[label].replace(mapeoDeCategoriasLabel)

Calculation started (calculation_id=e4c921cc-48d0-893a-5736-c2427ca3e9ee) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [58]:
dfpDataset

Calculation started (calculation_id=7cc921cc-4c93-34ba-c397-05c0a34b9e1a) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
     age  height_cm  weight_kg  body fat_%  diastolic  systolic  gripForce  sit and bend forward_cm  sit-ups counts  broad jump_cm class  gender_F  gender_M
0   27.0      172.3      75.24        21.3       80.0     130.0       54.9                     18.4            60.0          217.0     2         0         1
1   25.0      165.0      55.80        15.7       77.0     126.0       36.4                     16.3            53.0          229.0     0         0         1
2   31.0      179.6      78.00        20.1       92.0     152.0       44.8                     12.0            49.0          181.0     2         0         1
3   32.0      174.5      71.10        18.4       76.0     147.0       41.4                     15.2            53.0          219.0     1         0         1
4   28.0      173.8      67.70        17.1       70.0     127.0       43.5                     27.1            45.0          217.0     1         0         1
5   36.0      165.4      55.40     

In [59]:
dfpDataset["class"]


Calculation started (calculation_id=a6c921cc-59c0-3ca7-3b2e-bb255463f289) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
0     2
1     0
2     2
3     1
4     1
5     1
6     3
7     1
8     2
9     1
10    0
11    3
12    2
13    2
14    2
15    0
16    2
17    0
18    1
19    1
Name: class, dtype: object
Showing only the first 20



In [60]:
#Convertimos el dataframe PANDAS a un dataframe SPARK
dfDataset = dfpDataset.to_spark()

#Verificamos
dfDataset.show()

Calculation started (calculation_id=76c921cc-6257-8926-ce12-71aab2a126df) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+-----+--------+--------+
| age|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|class|gender_F|gender_M|
+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+-----+--------+--------+
|27.0|    172.3|    75.24|      21.3|     80.0|   130.0|     54.9|                   18.4|          60.0|        217.0|    2|       0|       1|
|25.0|    165.0|     55.8|      15.7|     77.0|   126.0|     36.4|                   16.3|          53.0|        229.0|    0|       0|       1|
|31.0|    179.6|     78.0|      20.1|     92.0|   152.0|     44.8|                   12.0|          49.0|        181.0|    2|       0|       1|
|32.0|    174.5|     71.1|      18.4|     76.0|   147.0|     41.4|                   15.2|          53.0|        

In [61]:
#Colocamos hasta el final la columna LABEL
dfDatasetOrdenado = dfDataset.select(
    dfDataset["class"],
    dfDataset["age"],
    dfDataset["height_cm"],
    dfDataset["weight_kg"],
    dfDataset["body fat_%"],
    dfDataset["diastolic"],
    dfDataset["systolic"],
    dfDataset["gripForce"],
    dfDataset["sit and bend forward_cm"],
    dfDataset["sit-ups counts"],
    dfDataset["broad jump_cm"],
    dfDataset["gender_F"],
    dfDataset["gender_M"]
)

#Verificamos
dfDatasetOrdenado.show()

Calculation started (calculation_id=e6c921cc-6f84-712c-1504-d73b29f29adb) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+
|class| age|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|gender_F|gender_M|
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+
|    2|27.0|    172.3|    75.24|      21.3|     80.0|   130.0|     54.9|                   18.4|          60.0|        217.0|       0|       1|
|    0|25.0|    165.0|     55.8|      15.7|     77.0|   126.0|     36.4|                   16.3|          53.0|        229.0|       0|       1|
|    2|31.0|    179.6|     78.0|      20.1|     92.0|   152.0|     44.8|                   12.0|          49.0|        181.0|       0|       1|
|    1|32.0|    174.5|     71.1|      18.4|     76.0|   147.0|     41.4|                   15.2|          53.0|  

In [62]:
#Utilitarios de Spark
import pyspark.sql.functions as f

Calculation started (calculation_id=52c921cc-7cd9-4419-6fe1-625f1ad35e0a) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [63]:
dfIndice = dfDatasetOrdenado.withColumn("indice_fila", f.monotonically_increasing_id())


Calculation started (calculation_id=9cc921cc-8c29-c9fb-1237-065f18a3ff74) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [64]:
dfIndice.show()

Calculation started (calculation_id=08c921cc-9bf4-c0be-ec97-c8508c741f6d) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+-----------+
|class| age|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|gender_F|gender_M|indice_fila|
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+-----------+
|    2|27.0|    172.3|    75.24|      21.3|     80.0|   130.0|     54.9|                   18.4|          60.0|        217.0|       0|       1|          0|
|    0|25.0|    165.0|     55.8|      15.7|     77.0|   126.0|     36.4|                   16.3|          53.0|        229.0|       0|       1|          1|
|    2|31.0|    179.6|     78.0|      20.1|     92.0|   152.0|     44.8|                   12.0|          49.0|        181.0|       0|       1|          2|
|    1|32.0|    174.5|     71.1|      18.

In [65]:
#Veamos solo el campo "indice_fila" y todos los registros
#Vemos que no neceriamente los �ndices son incrementales
dfIndice.select(dfIndice["indice_fila"]).show(10000, False)

Calculation started (calculation_id=94c921cc-a91e-ca20-4410-2639487477ff) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-----------+
|indice_fila|
+-----------+
|0          |
|1          |
|2          |
|3          |
|4          |
|5          |
|6          |
|7          |
|8          |
|9          |
|10         |
|11         |
|12         |
|13         |
|14         |
|15         |
|16         |
|17         |
|18         |
|19         |
|20         |
|21         |
|22         |
|23         |
|24         |
|25         |
|26         |
|27         |
|28         |
|29         |
|30         |
|31         |
|32         |
|33         |
|34         |
|35         |
|36         |
|37         |
|38         |
|39         |
|40         |
|41         |
|42         |
|43         |
|44         |
|45         |
|46         |
|47         |
|48         |
|49         |
|50         |
|51         |
|52         |
|53         |
|54         |
|55         |
|56         |
|57         |
|58         |
|59         |
|60         |
|61         |
|62         |
|63         |
|64         |
|65         |
|66        

In [66]:
#Este utilitario permite definir una columna para ordenar los registros
#Los ordenaremos por la columna "indice_fila"
from pyspark.sql.window import Window

#Agregamos la columna "indice_fila_2"
#Usamos nuevamente la funci�n "row_number" para agregar el �ndice
#Pero esta vez se generar� en orden, ya que estamos usando el Window.orderBy
dfIndice = dfIndice.withColumn(
    "indice_fila_2", 
    f.row_number().over(Window.orderBy("indice_fila"))
)

Calculation started (calculation_id=14c921cc-b666-b0c7-2c42-094485293f9b) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [67]:
dfIndice.select(dfIndice["indice_fila_2"]).show(10000, False)

Calculation started (calculation_id=aec921cc-bb9c-eeaf-1135-ccd7a93100e9) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-------------+
|indice_fila_2|
+-------------+
|1            |
|2            |
|3            |
|4            |
|5            |
|6            |
|7            |
|8            |
|9            |
|10           |
|11           |
|12           |
|13           |
|14           |
|15           |
|16           |
|17           |
|18           |
|19           |
|20           |
|21           |
|22           |
|23           |
|24           |
|25           |
|26           |
|27           |
|28           |
|29           |
|30           |
|31           |
|32           |
|33           |
|34           |
|35           |
|36           |
|37           |
|38           |
|39           |
|40           |
|41           |
|42           |
|43           |
|44           |
|45           |
|46           |
|47           |
|48           |
|49           |
|50           |
|51           |
|52           |
|53           |
|54           |
|55           |
|56           |
|57           |
|58           |
|

In [68]:
#Obtenemos el n�mero total de registros
numeroDeRegistros = dfIndice.count()

#Verificamos
print(numeroDeRegistros)

Calculation started (calculation_id=32c921cc-dcf0-b5b2-2187-e7aa19454295) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
13393



In [70]:
#Calculamos cu�ntos registros representan el 20%
cantidadDeRegistrosValidacion = int(numeroDeRegistros/5)

#Verificamos
print(cantidadDeRegistrosValidacion)

Calculation started (calculation_id=aec921cd-137c-228e-b62a-14d98aa93ca2) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
2678



In [71]:
#Obtenemos el PRIMER 20% de registros
df1 = dfIndice.filter(
    (dfIndice["indice_fila_2"] >= 0) &
    (dfIndice["indice_fila_2"] < cantidadDeRegistrosValidacion)
).drop("indice_fila").drop("indice_fila_2")

#Verificamos
df1.show()

Calculation started (calculation_id=e4c921cd-1d3c-09a2-07b5-f129a58b2fe9) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+
|class| age|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|gender_F|gender_M|
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+
|    2|27.0|    172.3|    75.24|      21.3|     80.0|   130.0|     54.9|                   18.4|          60.0|        217.0|       0|       1|
|    0|25.0|    165.0|     55.8|      15.7|     77.0|   126.0|     36.4|                   16.3|          53.0|        229.0|       0|       1|
|    2|31.0|    179.6|     78.0|      20.1|     92.0|   152.0|     44.8|                   12.0|          49.0|        181.0|       0|       1|
|    1|32.0|    174.5|     71.1|      18.4|     76.0|   147.0|     41.4|                   15.2|          53.0|  

In [72]:
#Obtenemos el SEGUNDO 20% de registros
df2 = dfIndice.filter(
    (dfIndice["indice_fila_2"] >= cantidadDeRegistrosValidacion) &
    (dfIndice["indice_fila_2"] < 2*cantidadDeRegistrosValidacion)
).drop("indice_fila").drop("indice_fila_2")

#Verificamos
df2.show()

Calculation started (calculation_id=e6c921cd-376e-6258-4805-34c760716978) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+
|class| age|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|gender_F|gender_M|
+-----+----+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+--------+--------+
|    1|28.0|    173.4|     72.8|      14.1|     82.0|   121.0|     41.4|                   18.4|          59.0|        242.0|       0|       1|
|    3|54.0|    155.9|     71.0|      44.1|     76.0|   139.0|     20.9|                   16.6|          24.0|        138.0|       1|       0|
|    0|23.0|    151.8|     46.9|      20.5|     60.0|   105.0|     22.2|                   28.9|          50.0|        178.0|       1|       0|
|    0|37.0|    161.7|     59.4|      19.4|     90.0|   155.0|     43.5|                   23.0|          57.0|  

In [73]:
#TERCER CORTE
df3 = dfIndice.filter(
    (dfIndice["indice_fila_2"] >= 2*cantidadDeRegistrosValidacion) &
    (dfIndice["indice_fila_2"] < 3*cantidadDeRegistrosValidacion)
).drop("indice_fila").drop("indice_fila_2")

Calculation started (calculation_id=28c921cd-4bd1-9483-f293-da745415c9bd) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [74]:
#CUARTO CORTE
df4 = dfIndice.filter(
    (dfIndice["indice_fila_2"] >= 3*cantidadDeRegistrosValidacion) &
    (dfIndice["indice_fila_2"] < 4*cantidadDeRegistrosValidacion)
).drop("indice_fila").drop("indice_fila_2")

Calculation started (calculation_id=2cc921cd-53c3-6609-6550-78a21bfb8f04) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [75]:
#QUINTO CORTE
df5 = dfIndice.filter(
    (dfIndice["indice_fila_2"] >= 4*cantidadDeRegistrosValidacion) &
    (dfIndice["indice_fila_2"] <= 5*cantidadDeRegistrosValidacion)
).drop("indice_fila").drop("indice_fila_2")

Calculation started (calculation_id=56c921cd-5b99-a7f7-3902-0376cc5e9149) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [76]:
bucket = "datasetsbdajac"

Calculation started (calculation_id=26c921cd-6387-1186-10ba-38c9cd5bd9f8) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [80]:
directorioDataset = "data/bodyperformance_dataset_validacion_cruzada"

Calculation started (calculation_id=92c921cf-1322-1e01-2804-72bd2d407f4d) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [81]:
def generar_dataset_validacion_cruzada(dfTest, dfTrain, bucket, directorioDataset):
    #Directorio en donde est�n los archivos de entrenamiento
    directorioDeEntrenamiento = f"s3://{bucket}/{directorioDataset}/train/"

    #Directorio en donde est�n los archivos de validaci�n
    directorioDeValidacion = f"s3://{bucket}/{directorioDataset}/test/"
    
    #Almacenamos el dataframe de entrenamiento
    dfTrain.write.format("csv").option("header", "false").option("delimiter", ",").option("encoding", "ISO-8859-1").mode("overwrite").save(directorioDeEntrenamiento)

    #Almacenamos el dataframe de entrenamiento
    dfTest.write.format("csv").option("header", "false").option("delimiter", ",").option("encoding", "ISO-8859-1").mode("overwrite").save(directorioDeValidacion)
    
    #Nos conectamos al servicio de "S3" para eliminar los archivos "_SUCCESS"
    s3 = boto3.client("s3")

    #Eliminamos el archivo "_SUCCESS" del dataset de entrenamiento
    s3.delete_object(
        Bucket = bucket,
        Key = f"{directorioDataset}/train/_SUCCESS"
    )

    #Eliminamos el archivo "_SUCCESS" del dataset de validaci�n
    s3.delete_object(
        Bucket = bucket,
        Key = f"{directorioDataset}/test/_SUCCESS"
    )

Calculation started (calculation_id=06c921cf-219a-7d3e-74ba-a249c53eada9) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [82]:
#Generamos todos los datasets

#Generamos el PRIMER conjunto de datasets
generar_dataset_validacion_cruzada(
    df1,
    df2.union(df3).union(df4).union(df5),
    bucket,
    "data/bodyperformance_dataset_validacion_cruzada/vc1"
)

#Generamos el SEGUNDO conjunto de datasets
generar_dataset_validacion_cruzada(
    df2,
    df1.union(df3).union(df4).union(df5),
    bucket,
    "data/bodyperformance_dataset_validacion_cruzada/vc2"
)

#Generamos el TERCER conjunto de datasets
generar_dataset_validacion_cruzada(
    df3,
    df1.union(df2).union(df4).union(df5),
    bucket,
    "data/bodyperformance_dataset_validacion_cruzada/vc3"
)

#Generamos el CUARTO conjunto de datasets
generar_dataset_validacion_cruzada(
    df4,
    df1.union(df2).union(df3).union(df5),
    bucket,
    "data/bodyperformance_dataset_validacion_cruzada/vc4"
)

#Generamos el QUINTO conjunto de datasets
generar_dataset_validacion_cruzada(
    df5,
    df1.union(df2).union(df3).union(df4),
    bucket,
    "data/bodyperformance_dataset_validacion_cruzada/vc5"
)

Calculation started (calculation_id=f8c921cf-2c9d-8a7b-a6cb-4971b9dc9cb0) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [84]:
s3 = boto3.client("s3")
#Obtenemos las columnas del dataframe
columnas = dfDataset.columns

#Removemos la columna "label"
columnas.remove(label)

#Obtenemos la cantidad de columnas features
dimensiones = len(columnas)

#Variable JSON que guarda la metadata del dataset
metadata = {
    "features": columnas,
    "label": label,
    "dimensiones": dimensiones,
    "categorias": categorias,
    "valoresPorCategoria": valoresUnicosDeCadaCategoria,
    "mapeoDeCategoriasLabel": mapeoDeCategoriasLabel

}

#Verificamos
print(metadata)

# Guardar la metadata en un archivo JSON, en alguna ruta temporal del servidor
with open("/tmp/metadata.json", "w") as archivo:
    json.dump(metadata, archivo)

#Definimos la ruta destino en donde se colocara el archivo
rutaDestino = f"{rutaDeArchivo}_metadata/metadata.json"

#Verificamos
print(rutaDestino)

# Cargar el archivo JSON en S3
s3.upload_file(
    "/tmp/metadata.json",
    bucket,
    rutaDestino
)

Calculation started (calculation_id=d6c921d2-ab1f-edc9-8399-977cb854dbc9) in (session=a6c921aa-b4a5-97c7-80bd-834731a98f07). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
{'features': ['age', 'height_cm', 'weight_kg', 'body fat_%', 'diastolic', 'systolic', 'gripForce', 'sit and bend forward_cm', 'sit-ups counts', 'broad jump_cm', 'gender_F', 'gender_M'], 'label': 'class', 'dimensiones': 12, 'categorias': ['gender'], 'valoresPorCategoria': {'gender': ['F', 'M'], 'class': ['A', 'B', 'C', 'D']}, 'mapeoDeCategoriasLabel': {'A': 0, 'B': 1, 'C': 2, 'D': 3}}
data/bodyperformance_metadata/metadata.json

