Spark se ha incorporado herramientas de la mayoría de los científicos de datos. Es un framework open source para la computación en paralelo utilizando clusters. Se utiliza especialmente para acelerar la computación iterativa de grandes cantidades de datos o de modelos muy complejos.

PySpark SQL es la colaboración de Python con Spark SQL.
 Se utiliza principalmente en el procesamiento de datos estructurados y semiestructurados. La API que proporciona puede leer datos de una gran cantidad de fuentes. Estas fuentes de datos pueden estar en diferentes formatos de datos.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

Módulos PySpark

Algunas de las clases importantes y sus características se dan a continuación:

pyspark.sql.SparkSession: Esta clase permite a los programadores programar en Spark con DataFrame y la funcionalidad SQL. SparkSession se usa para crear DataFrame, registrar DataFrame como tablas, tablas de caché, ejecuta SQL sobre tablas.

Como ya hemos dicho antes, a la hora de trabajar con un DataFrame, la base o de donde cuelga toda la funcionalidad es una SparkSession. Para crearla lo que haremos de la siguiente forma.

A través de SparkSession, puede crear un DataFrame, o puede registrar el DataFrame como una tabla y realizar una serie de operaciones SQL basadas en esto. DataFrame es similar a DataFrame en pandas.

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
#una vez creada la session vamos a crear un dataframe a partir de un fichero de texto. 
df = spark.read.csv("C:/Users/URIEL/Documents/DATOS (CSV,XML,JSON,ETC)/planets.csv", header = True, inferSchema = True)

In [4]:
df.show()

+---------------+------+--------------+-----+--------+----+
|         method|number|orbital_period| mass|distance|year|
+---------------+------+--------------+-----+--------+----+
|Radial Velocity|     1|         269.3|  7.1|    77.4|2006|
|Radial Velocity|     1|       874.774| 2.21|   56.95|2008|
|Radial Velocity|     1|         763.0|  2.6|   19.84|2011|
|Radial Velocity|     1|        326.03| 19.4|  110.62|2007|
|Radial Velocity|     1|        516.22| 10.5|  119.47|2009|
|Radial Velocity|     1|        185.84|  4.8|   76.39|2008|
|Radial Velocity|     1|        1773.4| 4.64|   18.15|2002|
|Radial Velocity|     1|         798.5| null|   21.41|1996|
|Radial Velocity|     1|         993.3| 10.3|    73.1|2008|
|Radial Velocity|     2|         452.8| 1.99|   74.79|2010|
|Radial Velocity|     2|         883.0| 0.86|   74.79|2010|
|Radial Velocity|     1|         335.1| 9.88|   39.43|2009|
|Radial Velocity|     1|         479.1| 3.88|   97.28|2008|
|Radial Velocity|     3|        1078.0| 

In [18]:
df.columns

['method', 'number', 'orbital_period', 'mass', 'distance', 'year']

In [19]:
df.printSchema()

root
 |-- method: string (nullable = true)
 |-- number: integer (nullable = true)
 |-- orbital_period: double (nullable = true)
 |-- mass: double (nullable = true)
 |-- distance: double (nullable = true)
 |-- year: integer (nullable = true)



In [21]:
df.select('method','number').show()

+---------------+------+
|         method|number|
+---------------+------+
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     2|
|Radial Velocity|     2|
|Radial Velocity|     1|
|Radial Velocity|     1|
|Radial Velocity|     3|
|Radial Velocity|     3|
|Radial Velocity|     3|
|Radial Velocity|     1|
|Radial Velocity|     5|
|Radial Velocity|     5|
|Radial Velocity|     5|
+---------------+------+
only showing top 20 rows



In [17]:
type(df)

pyspark.sql.dataframe.DataFrame

In [18]:
df.select(df['method'], df['number'] + 1).show()

+---------------+------------+
|         method|(number + 1)|
+---------------+------------+
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           3|
|Radial Velocity|           3|
|Radial Velocity|           2|
|Radial Velocity|           2|
|Radial Velocity|           4|
|Radial Velocity|           4|
|Radial Velocity|           4|
|Radial Velocity|           2|
|Radial Velocity|           6|
|Radial Velocity|           6|
|Radial Velocity|           6|
+---------------+------------+
only showing top 20 rows



In [22]:
df.filter(df['distance'] > 100).show()

+--------------------+------+--------------+----+--------+----+
|              method|number|orbital_period|mass|distance|year|
+--------------------+------+--------------+----+--------+----+
|     Radial Velocity|     1|        326.03|19.4|  110.62|2007|
|     Radial Velocity|     1|        516.22|10.5|  119.47|2009|
|             Imaging|     1|          null|null|   165.0|2007|
|             Imaging|     1|          null|null|   140.0|2004|
|             Imaging|     1|          null|null|   145.0|2013|
|             Imaging|     1|          null|null|   139.0|2004|
|Eclipse Timing Va...|     2|        5767.0|null|  130.72|2008|
|Eclipse Timing Va...|     2|        3321.0|null|  130.72|2008|
|Eclipse Timing Va...|     2|       5573.55|null|   500.0|2010|
|Eclipse Timing Va...|     2|        2883.5|null|   500.0|2010|
|             Imaging|     1|          null|null|   145.0|2008|
|             Imaging|     1|          null|null|   140.0|2010|
|     Radial Velocity|     1|        137

In [26]:
df.groupBy("method").sum('distance').show()

+--------------------+------------------+
|              method|     sum(distance)|
+--------------------+------------------+
|Transit Timing Va...|            3313.0|
|             Imaging|           2166.91|
|             Transit|134242.77000000002|
|Pulsation Timing ...|              null|
|          Astrometry|             35.75|
|        Microlensing|           41440.0|
|       Pulsar Timing|            1200.0|
|Orbital Brightnes...|            2360.0|
|Eclipse Timing Va...|           1261.44|
|     Radial Velocity| 27348.11000000001|
+--------------------+------------------+



In [30]:
#queries sql 
#Registrar DataFrame as Temporary Table 
df.createOrReplaceTempView("table")

In [72]:
query = """SELECT method AS Metodo, COUNT(year) AS Cantidad_Anos
FROM table
WHERE orbital_period IS NOT NULL
GROUP BY method
ORDER BY COUNT(year) DESC; """

In [73]:
spark.sql(query).show()

+--------------------+-------------+
|              Metodo|Cantidad_Anos|
+--------------------+-------------+
|     Radial Velocity|          553|
|             Transit|          397|
|             Imaging|           12|
|Eclipse Timing Va...|            9|
|        Microlensing|            7|
|       Pulsar Timing|            5|
|Transit Timing Va...|            3|
|Orbital Brightnes...|            3|
|          Astrometry|            2|
|Pulsation Timing ...|            1|
+--------------------+-------------+



In [82]:
query2 = """ SELECT method AS Metodo, AVG(distance) as Promedio_Distancia, SUM(distance) AS Suma_Distancia
FROM TABLE 
WHERE orbital_period IS NOT NULL 
GROUP BY method
HAVING AVG(distance) IS NOT NULL AND SUM(distance) IS NOT NULL"""

In [85]:
 df2 = spark.sql(query2)

In [87]:
df2 = df2.toPandas()

In [88]:
df2

Unnamed: 0,Metodo,Promedio_Distancia,Suma_Distancia
0,Transit Timing Variations,1104.333333,3313.0
1,Imaging,27.371,273.71
2,Transit,599.29808,134242.77
3,Astrometry,17.875,35.75
4,Pulsar Timing,1200.0,1200.0
5,Orbital Brightness Modulation,1180.0,2360.0
6,Eclipse Timing Variations,315.36,1261.44
7,Radial Velocity,51.600208,27348.11


In [89]:
type(df2)

pandas.core.frame.DataFrame

* pyspark.sql.Columns: se pueden crear instancias de columna en DataFrame usando esta clase.

In [111]:
from pyspark.sql import *


In [112]:
sqlContext.registerDataFrameAsTable(df, 'dataframe') #pseudo tabla que podemos usar despues para hacer queries

In [113]:
df.rdd.getNumPartitions() #para saber en cuantos trozos se ha partido la coleccion de datos. 

1

In [114]:
results = df.collect()

In [117]:
print(results) #tenemos los distintos campos del dataframe. 

[Row(method='Radial Velocity', number=1, orbital_period=269.3, mass=7.1, distance=77.4, year=2006), Row(method='Radial Velocity', number=1, orbital_period=874.774, mass=2.21, distance=56.95, year=2008), Row(method='Radial Velocity', number=1, orbital_period=763.0, mass=2.6, distance=19.84, year=2011), Row(method='Radial Velocity', number=1, orbital_period=326.03, mass=19.4, distance=110.62, year=2007), Row(method='Radial Velocity', number=1, orbital_period=516.22, mass=10.5, distance=119.47, year=2009), Row(method='Radial Velocity', number=1, orbital_period=185.84, mass=4.8, distance=76.39, year=2008), Row(method='Radial Velocity', number=1, orbital_period=1773.4, mass=4.64, distance=18.15, year=2002), Row(method='Radial Velocity', number=1, orbital_period=798.5, mass=None, distance=21.41, year=1996), Row(method='Radial Velocity', number=1, orbital_period=993.3, mass=10.3, distance=73.1, year=2008), Row(method='Radial Velocity', number=2, orbital_period=452.8, mass=1.99, distance=74.79

In [121]:
df.show(30, False)

+---------------+------+--------------+-----+--------+----+
|method         |number|orbital_period|mass |distance|year|
+---------------+------+--------------+-----+--------+----+
|Radial Velocity|1     |269.3         |7.1  |77.4    |2006|
|Radial Velocity|1     |874.774       |2.21 |56.95   |2008|
|Radial Velocity|1     |763.0         |2.6  |19.84   |2011|
|Radial Velocity|1     |326.03        |19.4 |110.62  |2007|
|Radial Velocity|1     |516.22        |10.5 |119.47  |2009|
|Radial Velocity|1     |185.84        |4.8  |76.39   |2008|
|Radial Velocity|1     |1773.4        |4.64 |18.15   |2002|
|Radial Velocity|1     |798.5         |null |21.41   |1996|
|Radial Velocity|1     |993.3         |10.3 |73.1    |2008|
|Radial Velocity|2     |452.8         |1.99 |74.79   |2010|
|Radial Velocity|2     |883.0         |0.86 |74.79   |2010|
|Radial Velocity|1     |335.1         |9.88 |39.43   |2009|
|Radial Velocity|1     |479.1         |3.88 |97.28   |2008|
|Radial Velocity|3     |1078.0        |2

# USO DE COUNT PARA OBTENER EL TOTAL

In [123]:
df.count()

1035

# FUNCIONES LAMBDA Y USER DEFINED FUNCTIONS

In [145]:
from pyspark.sql.types import BooleanType 
from pyspark.sql.functions import udf

In [156]:
less_ten = udf(lambda s: s < 2010, BooleanType())
lamdf = df.filter(less_ten(df.year))
lamdf.show()
lamdf.count()

In [163]:
df.distinct().show()

+---------------+------+--------------+-----+--------+----+
|         method|number|orbital_period| mass|distance|year|
+---------------+------+--------------+-----+--------+----+
|Radial Velocity|     2|        5000.0| 0.82|   25.65|2009|
|        Transit|     5|       5.90124| null|   303.0|2011|
|        Transit|     5|      18.16406| null|   368.0|2013|
|  Pulsar Timing|     1|    0.09070629| null|  1200.0|2011|
|Radial Velocity|     1|        2502.0| 1.55|    3.22|2000|
|Radial Velocity|     1|         829.0|  0.8|    32.7|2001|
|Radial Velocity|     1|      2.817822| 0.38|    35.8|2005|
|Radial Velocity|     2|         30.93|0.076|   56.92|2013|
|Radial Velocity|     3|         123.0| null|    null|2014|
|        Transit|     1|       138.317| null|  1000.0|2012|
|        Transit|     3|       2.15491| null|    null|2012|
|        Transit|     1|     1.4822446| null|   360.0|2011|
|Radial Velocity|     1|        1840.0|  1.6|   67.61|2009|
|Radial Velocity|     2|         408.6| 

In [165]:
#muestras con el DF. 
muestraDF = df.sample(withReplacement = False, fraction = 0.05)
muestraDF.show()

+---------------+------+--------------+------+--------+----+
|         method|number|orbital_period|  mass|distance|year|
+---------------+------+--------------+------+--------+----+
|        Imaging|     1|        6000.0|  null|   19.28|2008|
|Radial Velocity|     1|         714.3|  10.6|    null|2007|
|Radial Velocity|     2|        3.8728| 0.027|    20.1|2013|
|        Transit|     1|     1.5089557|  null|    null|2008|
|        Transit|     1|       2.99433|  null|   560.0|2010|
|        Transit|     1|       1.51214|  null|  1340.0|2010|
|        Transit|     1|        3.7681|  null|   920.0|2011|
|        Transit|     1|     1.9000693|  null|   870.0|2011|
|Radial Velocity|     1|        4.6938| 0.035|    4.54|2007|
|Radial Velocity|     4|       61.1166|2.2756|     4.7|1998|
|Radial Velocity|     1|        133.71|  3.37|   17.62|2000|
|        Transit|     2|     10.338523|  null|    90.0|2010|
|        Transit|     1|      4.457243|  null|   501.0|2012|
|Radial Velocity|     2|