In [1]:
!pip install pyspark py4j
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 52.4 MB/s 
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 48.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=864c418b8df948211cfbe3adc45129d7049872685e3fbbdf7cab315aab40383a
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
spark= SparkContext.getOrCreate();

sqlContext =SQLContext(spark)

path= "/content/sample_data/platziData/"




In [3]:
#creamos Schema
juegoSchema=StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("anio",StringType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False)
])

#Creamos data frame
#linea option, header true para que el encabezado sea considerado header en vez de columna
juegoDF=sqlContext.read.schema(juegoSchema) \
        .option("header","true").csv(path+"juegos.csv")

juegoDF.show(4)


+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



In [4]:
def deleteHeader(index,iterator):

    #devuelve la lista a partir del primer valor del rdd, osea sin el encabezado
    return iter(list(iterator)[1:])

In [5]:
depOlimpicoRDD = spark.textFile(path+"deportista.csv") \
.map(lambda line : line.split(","))

depOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [6]:
#eliminamos encabezado para que no haya excepciones de tipo
depOlimpicoRDD = depOlimpicoRDD.mapPartitionsWithIndex(deleteHeader)

#casteamos los valores  del rdd para evitar excepciones
depOlimpicoRDD=depOlimpicoRDD.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

#creamos schema
depOlimpicoSchema=StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

#creamos Data Frame
depOlimpicosDF=sqlContext.createDataFrame(depOlimpicoRDD,depOlimpicoSchema)


In [7]:
depOlimpicosDF.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [8]:
#OPERACIONES SOBRE Data Frames

#aqui cambiariamos de nombre genero por sexo y eliminariamos la columna altura
depOlimpicosDF.withColumnRenamed("genero","sexo").drop("altura")
depOlimpicosDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [9]:
from pyspark.sql.functions import *

#como hacer un select dep_id, nombre, 
#  col genera una lista con todos los valores de la columna y poder hacer operaciones sobre esa lista
depOlimpicosDF= depOlimpicosDF.select("deportista_id","nombre"
                      ,col("edad").alias("edadAlJugar")
                      ,"equipo_id")

depOlimpicosDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [10]:
#solo edad mas de 0 sorteado por edad asc

depOlimpicosDF = depOlimpicosDF.filter(
    (depOlimpicosDF.edadAlJugar != 0)
)
depOlimpicosDF.sort("edadAlJugar").show(10)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        47618|Sonja Henie Toppi...|         11|      742|
|        37333|Carlos Bienvenido...|         11|      982|
|        51268|      Beatrice Hutiu|         11|      861|
|        40129|    Luigina Giavotti|         11|      507|
|        52070|        Etsuko Inada|         11|      514|
|        22411|Magdalena Cecilia...|         11|      413|
|        48939|             Ho Gang|         12|      738|
|        42835|   Werner Grieshofer|         12|       71|
|        47506|        Hem Reaksmey|         12|      168|
|        40296|    Alain C. Giletti|         12|      362|
+-------------+--------------------+-----------+---------+
only showing top 10 rows



In [11]:
#CREAMOS RESULTADO DF PARA LUEGO HACER JOINS

#sacamos info con RDD
resultsRDD = spark.textFile(path+"resultados.csv") \
.map(lambda line : line.split(","))

#Quitamos header
resultsRDD = resultsRDD.mapPartitionsWithIndex(deleteHeader)

#casteamos los valores  del rdd para evitar excepciones
resultsRDD=resultsRDD.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4])
))

#creamos schema
resultsSchema=StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False)
])

#creamos Data Frame
resultsDF=sqlContext.createDataFrame(resultsRDD,resultsSchema)



In [12]:
#JOINS SOBRE DF

depOlimpicosDF.join(
    resultsDF,
    depOlimpicosDF.deportista_id==resultsDF.deportista_id,
    "left"
) \
.join(
    juegoDF,
    juegoDF.juego_id==resultsDF.juego_id,
    "left"
) 


DataFrame[deportista_id: int, nombre: string, edadAlJugar: int, equipo_id: int, resultado_id: int, medalla: string, deportista_id: int, juego_id: int, evento_id: int, juego_id: int, anio: string, temporada: string, ciudad: string]

In [13]:
#Funciones de agregacion

#count
depOlimpicosDF.sort("edadAlJugar").groupBy("edadAlJugar").count().take(5)


[Row(edadAlJugar=31, count=1200),
 Row(edadAlJugar=65, count=7),
 Row(edadAlJugar=53, count=38),
 Row(edadAlJugar=34, count=624),
 Row(edadAlJugar=28, count=2751)]

In [14]:
#agg es el modo mas recomendado para hacer agrupaciones

depOlimpicosDF.sort("edadAlJugar").groupBy("equipo_id").agg(avg("edadAlJugar").alias("edad_promedio")).take(5)

[Row(equipo_id=496, edad_promedio=24.65863453815261),
 Row(equipo_id=148, edad_promedio=22.6),
 Row(equipo_id=1088, edad_promedio=23.607526881720432),
 Row(equipo_id=471, edad_promedio=24.0),
 Row(equipo_id=833, edad_promedio=36.0)]

In [15]:
#SQL
depOlimpicosDF.registerTempTable("deportista")
sqlContext.sql("""
                SELECT nombre, edadAlJugar
                FROM deportista
                WHERE edadAlJugar >30
                ;
                    """).show()



+--------------------+-----------+
|              nombre|edadAlJugar|
+--------------------+-----------+
|Edgar Lindenau Aabye|         34|
|     Per Knut Aaland|         31|
|        John Aalberg|         31|
|   Jyri Tapani Aalto|         31|
|Pirjo Hannele Aal...|         32|
|Timo Antero Aaltonen|         31|
|Win Valdemar Aalt...|         54|
|     JanErik Aarberg|         43|
|  Lars Thorlaksn Aas|         33|
|Morten Gjerdrum A...|         34|
|         Hakon Aasns|         41|
|          Hans Aasns|         33|
|      M Bairo Abakar|         31|
|      Jol Marc Abati|         34|
|          Ral Abatte|         38|
|   Georgia Abatzidou|         35|
|Youssef Mohamed A...|         31|
|       Tran Abbasova|         33|
|William Bill Abbo...|         42|
|       Joanne Abbott|         41|
+--------------------+-----------+
only showing top 20 rows



In [16]:
#UDF , user defined function, revisatelo mas tarde de nuevo



In [17]:
#PERSISTENCIA DE DATOS CON SPARK
from pyspark.storagelevel import StorageLevel



In [19]:
depOlimpicosDF.is_cached #si esta en cache

False

In [28]:
depOlimpicosDF.rdd.cache()
depOlimpicosDF.is_cached #si esta en cache

True

In [24]:
depOlimpicosDF.rdd.getStorageLevel()
#StorageLevel(useDisk,useMemory,useOfHeap,deserialized,replication=1)
#useDisk, bol uso del disco 
#useMemory, bol uso de memoria
#useOfHeap, bol tener la mitad de datos almacenada
#deserialized,bol serializado o no
#replication ,int replicacion

StorageLevel(False, False, False, False, 1)

In [29]:
depOlimpicosDF.rdd.unpersist() #quitas peristencia

MapPartitionsRDD[44] at javaToPython at NativeMethodAccessorImpl.java:0

In [25]:
depOlimpicosDF.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[44] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
#PARTICIONADO DE DATOS
