**PYSPARK SQL**

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app').getOrCreate()
df = spark.read.csv('datos.csv', header=True, inferSchema=True)
df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/08 10:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---+--------+------+--------+
| id|producto|precio|cantidad|
+---+--------+------+--------+
|  1| Manzana|   1.2|       5|
|  2|  Banana|   0.8|       3|
|  3| Naranja|   1.0|       4|
|  4|    Pera|   1.5|       2|
|  5| Manzana|   1.2|       6|
+---+--------+------+--------+



In [14]:
# pyspark.sql.dataframe.DataFrame
type(df)

pyspark.sql.dataframe.DataFrame

In [3]:
# pyspark sql functions: select, col, expr, 
import pyspark.sql.functions as F
df.select(F.col('producto'), F.expr('length(producto)').alias('longitud')).show()

+--------+--------+
|producto|longitud|
+--------+--------+
| Manzana|       7|
|  Banana|       6|
| Naranja|       7|
|    Pera|       4|
| Manzana|       7|
+--------+--------+



In [4]:
# Literal lit
df.select(F.lit('texto fijo').alias('Texto'), df.id).show()

+----------+---+
|     Texto| id|
+----------+---+
|texto fijo|  1|
|texto fijo|  2|
|texto fijo|  3|
|texto fijo|  4|
|texto fijo|  5|
+----------+---+



In [7]:
# group data (groupby)
grouped_df = df.groupby('producto').agg(F.sum('cantidad').alias('total_cantidad'), F.mean('cantidad').alias('mean_cantidad'))
grouped_df.show()

+--------+--------------+-------------+
|producto|total_cantidad|mean_cantidad|
+--------+--------------+-------------+
| Naranja|             4|          4.0|
|  Banana|             3|          3.0|
|    Pera|             2|          2.0|
| Manzana|            11|          5.5|
+--------+--------------+-------------+



In [9]:
# aggregate data
avg_price = df.agg(F.avg('precio'))
avg_price.show()

+------------------+
|       avg(precio)|
+------------------+
|1.1400000000000001|
+------------------+



In [10]:
# remove duplicates (distinct)
df_no_duplicates = df.distinct()
df_no_duplicates.show()

+---+--------+------+--------+
| id|producto|precio|cantidad|
+---+--------+------+--------+
|  2|  Banana|   0.8|       3|
|  5| Manzana|   1.2|       6|
|  1| Manzana|   1.2|       5|
|  3| Naranja|   1.0|       4|
|  4|    Pera|   1.5|       2|
+---+--------+------+--------+



In [11]:
# order results (orderBy)
df.orderBy('precio', ascending=False).show()

+---+--------+------+--------+
| id|producto|precio|cantidad|
+---+--------+------+--------+
|  4|    Pera|   1.5|       2|
|  1| Manzana|   1.2|       5|
|  5| Manzana|   1.2|       6|
|  3| Naranja|   1.0|       4|
|  2|  Banana|   0.8|       3|
+---+--------+------+--------+



In [12]:
# conditional data in column (when)
df.withColumn('calidad', 
              F.when(F.expr('precio > 1.3'), 'alta')
              .when(F.expr('precio >= 1'), 'Media')
              .otherwise('Baja')
).show()

+---+--------+------+--------+-------+
| id|producto|precio|cantidad|calidad|
+---+--------+------+--------+-------+
|  1| Manzana|   1.2|       5|  Media|
|  2|  Banana|   0.8|       3|   Baja|
|  3| Naranja|   1.0|       4|  Media|
|  4|    Pera|   1.5|       2|   alta|
|  5| Manzana|   1.2|       6|  Media|
+---+--------+------+--------+-------+



In [13]:
# filter strings columns
df.filter(F.col('producto').contains('anz')).show()

+---+--------+------+--------+
| id|producto|precio|cantidad|
+---+--------+------+--------+
|  1| Manzana|   1.2|       5|
|  5| Manzana|   1.2|       6|
+---+--------+------+--------+

