### Dataframes, Spark SQL e Parquet

In [None]:
# Temos varias formas de crear un DataFrame:
# - a partir dunha lista de datos
# - lectura de ficheiros (diferentes formatos)
# -- Local Filesystem
# -- HDFS
# -- nube: S3 Azure, HBase, Mysql ...

In [None]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [None]:
df. show()

In [None]:
# Unha das novidades de Spark SQL é que permite facer consultas sobre as táboas/Dataframees como se
# fosen táboas dunha base de datos relacional, utilizando ANSI SQL

# O primeiro que hai que facer é crear unha táboa temporal sobre o Dataframe con 'createOrReplaceTempView()'
# A continuación poderanse executar sentencias SQL a través da función 'sql()'

In [None]:
df.createOrReplaceTempView('employees')
spark.sql('SELECT * FROM employees').show()

In [None]:
# Podemos gardar o resultado como un novo dataframe
resultado = spark.sql('SELECT * FROM employees')
resultado.printSchema()
resultado.show()

In [None]:
# Podemos filtrar con condicións
spark.sql("SELECT firstname, middlename, salary FROM employees WHERE salary > 3500").show()

In [None]:
# Podemos realizar funcións de agrupación
spark.sql("SELECT gender, count(*) FROM employees GROUP BY gender").show()

In [None]:
# En resumo, spark.sql permítenos traballar cos Dataframes como se de táboas relacionais se tratase
# e utilizar SQL para realizar as consultas que precisemos

In [None]:
# Ademais dos ficheiros de texto CSV, tsv, xml, json... Spark pode traballar con outros formatos
# Algúns formatos moi empregados son Avro ou Parquet

In [None]:
# Apache Parquet é un formato de almacenamento con almacenamento columnar. Esta característica fai
# que sexa moi rápido no procesado de consultas de agregación
# PySpark soporta Parquet de xeito nativo, sen necesidade de icorporar novas librarías

In [None]:
# Escritura a formato parquet
df.write.parquet('file:///tmp/employees.parquet')

In [None]:
# Lectura de format parquet
df_parquet = spark.read.parquet('file:///tmp/employees.parquet')
df_parquet.show()

In [None]:
# Unhas das particularidades de Parquet é que permite crear particións, de xeito que os datos
# se poden almacenar en ficheiros separados en función das nosas necesidades, o que pode mellorar
# o rendemento en certas consultas

In [None]:
# Podemos crear particións para os datos, separando os datos por 'gender' e 'salary' en ficheiros diferentes
df.write.partitionBy("gender","salary").mode("overwrite").parquet("file:///tmp/output/employees2.parquet")

In [None]:
# Podemos cargar o ficheiro completo
df_parquet_partido = spark.read.parquet("file:///tmp/output/employees2.parquet")
df_parquet_partido.show()

In [None]:
# Podemos cargar só unha partición (a partición de )gender=M)
df_parquet_partido = spark.read.parquet("file:///tmp/output/employees2.parquet/gender=M")
df_parquet_partido.show()

In [None]:
# Podemos cargar só unha partición máis específica
# Fíxate como xa non aparecen os campos
df_parquet_partido = spark.read.parquet("file:///tmp/output/employees2.parquet/gender=M/salary=4000")
df_parquet_partido.show()