# Spark II: Spark SQL

### 1. Crear SQLContext

In [None]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [None]:
sqlContext = pyspark.sql.SQLContext(sc)

In [None]:
sqlContext

### 2. Crear un DataFrame

 ***A partir de una lista***

In [None]:
df1 = sqlContext.createDataFrame([('Raul', 7), ('Michel', 8), ('Palomeque', 8)])

In [None]:
df1.collect()

In [None]:
df2 = sqlContext.createDataFrame([('Raul', 7), ('Michel', 8), ('Palomeque', 8)], ['jugador', 'numero'])

In [None]:
df2.collect()

***A partir de un diccionario***

In [None]:
df3 = sqlContext.createDataFrame([{'jugador': 'Raul', 'numero': 7}])

In [None]:
df3.collect()

***Seleccionar columnas***

In [None]:
df2.select('jugador').collect()

In [None]:
df2.select('jugador').show()

In [None]:
df2.select(df2['jugador'], df2['numero']).show()

In [None]:
df2.select(df2['jugador'], df2['numero'] + 1).show()

***Filtrar filas***

In [None]:
df2.filter(df2['numero'] > 7).show()

***Agrupar por un campo***

In [None]:
df2.groupBy('numero').count().show()

***Transformar a RDD***

In [None]:
df2.rdd.collect()

In [None]:
df2.printSchema()

### 3. Cargar DataFrame a partir de un fichero

***Cargar de un fichero csv***

In [None]:
df_test = sqlContext.read.csv('data/test.csv')

In [None]:
df_test

In [None]:
df_test.show()

***Cargar con una cabecera***

In [None]:
df_test = sqlContext.read.option('header', 'true').csv('data/test.csv')

In [None]:
df_test.show()

***Cargar con un delimitador***

In [None]:
df_test = sqlContext.read.option('header', 'true').option('delimiter', ';').csv('data/test.csv')

In [None]:
df_test.show()

### 4.Un ejemplo un poco más real

In [None]:
df_barrios = sqlContext.read.option('header', 'true').option('delimiter', ';').csv('data/Barrios.csv')

In [None]:
df_barrios.printSchema()

In [None]:
df_barrios.count()

In [None]:
df_barrios.columns

In [None]:
df_barrios.show()

In [None]:
df_barrios.select('CODIGO_DISTRITO', 'Nombre de barrio').show()

In [None]:
df_distritos = sqlContext.read.option('header', 'true').option('delimiter', ';').csv('data/Distritos.csv')

In [None]:
df_distritos.show()

In [None]:
df_distritos.printSchema()

In [None]:
df_barrios.printSchema()

In [None]:
df_barrios.select('CODIGO_DISTRITO', 'Nombre de barrio').join(df_distritos).show()

In [None]:
df_barrios_distritos = df_barrios.select('CODIGO_DISTRITO', 'Nombre de barrio').join(df_distritos, on='CODIGO_DISTRITO')
df_barrios_distritos.show()

In [None]:
df_barrios_distritos = df_barrios_distritos.select('DISTRITO', 'Nombre de barrio')
df_barrios_distritos.show()

In [None]:
df_barrios_distritos.filter(df_barrios_distritos['DISTRITO'] == 'ARGANZUELA').show()

In [None]:
distrito_grouped = df_barrios_distritos.groupBy('DISTRITO')
distrito_grouped

In [None]:
distrito_grouped.count().show()

In [None]:
distrito_grouped.sum().show()

In [None]:
df_barrios.createOrReplaceTempView('BARRIOS')

In [None]:
df_distritos.createOrReplaceTempView('DISTRITOS')

In [None]:
todos_barrios = sqlContext.sql('SELECT * FROM BARRIOS')

In [None]:
todos_barrios.show()

In [None]:
distrito_uno = sqlContext.sql('SELECT * FROM BARRIOS WHERE CODIGO_DISTRITO="01"')

In [None]:
distrito_uno.show()

In [None]:
distritos_barrios = sqlContext.sql('SELECT * FROM DISTRITOS INNER JOIN BARRIOS ON BARRIOS.CODIGO_DISTRITO=DISTRITOS.CODIGO_DISTRITO')

In [None]:
distritos_barrios.select('DISTRITO', 'Nombre de barrio').show()

In [None]:
distritos_barrios.select('DISTRITO', 'Nombre de barrio').explain()

In [None]:
df_barrio_info = distritos_barrios.select('Nombre de barrio', 'Superficie (m2)', 'Perimetro (m)')

In [None]:
df_barrio_info.show()

In [None]:
df_barrio_info.orderBy('Superficie (m2)').show()

In [None]:
df_barrio_info.orderBy('Perimetro (m)').show()

In [None]:
df_barrio_info.describe('Superficie (m2)').show()

In [None]:
df_barrio_info.describe('Perimetro (m)').show()

In [None]:
df_barrio_info.withColumn('Perimetro (km)', df_barrio_info['Perimetro (m)'] / 1000).show()