In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [3]:
spark = SparkSession.builder.appName('DataFrame').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/23 20:36:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.json('personas.json')

                                                                                

In [5]:
df.show()

+----+------+
|edad|nombre|
+----+------+
|NULL|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



In [6]:
df.printSchema()

root
 |-- edad: long (nullable = true)
 |-- nombre: string (nullable = true)



In [7]:
df.columns

['edad', 'nombre']

In [8]:
df.describe().show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------+-----------------+------+
|summary|             edad|nombre|
+-------+-----------------+------+
|  count|                2|     3|
|   mean|             22.0|  NULL|
| stddev|4.242640687119285|  NULL|
|    min|               19|Carlos|
|    max|               25|Miguel|
+-------+-----------------+------+



[Stage 4:>                                                          (0 + 1) / 1]                                                                                

### schema

In [9]:
schema = [StructField ('edad', IntegerType(), True), StructField ('nombre', StringType(), True)]

In [10]:
schema_final = StructType(fields = schema)

In [11]:
df = spark.read.json('personas.json', schema = schema_final)

In [12]:
df.printSchema()

root
 |-- edad: integer (nullable = true)
 |-- nombre: string (nullable = true)



In [13]:
df.show()

+----+------+
|edad|nombre|
+----+------+
|NULL|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



### select

In [14]:
df['edad']

Column<'edad'>

In [15]:
type(df['edad'])

pyspark.sql.column.Column

In [16]:
df.select('edad').show()

+----+
|edad|
+----+
|NULL|
|  25|
|  19|
+----+



In [17]:
type(df.select('edad'))

pyspark.sql.dataframe.DataFrame

In [18]:
df.head(2)

[Row(edad=None, nombre='Miguel'), Row(edad=25, nombre='Carlos')]

In [19]:
df.head(2)[0]

Row(edad=None, nombre='Miguel')

In [20]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [21]:
df.select('nombre','edad').show()

+------+----+
|nombre|edad|
+------+----+
|Miguel|NULL|
|Carlos|  25|
|  Juan|  19|
+------+----+



### crear una nueva columna con withColumn

In [22]:
df.withColumn('edad_nueva', df['edad']*2).show()

+----+------+----------+
|edad|nombre|edad_nueva|
+----+------+----------+
|NULL|Miguel|      NULL|
|  25|Carlos|        50|
|  19|  Juan|        38|
+----+------+----------+



In [23]:
### cambiar el nombre de una columna con withColumnRename

In [24]:
df.withColumnRenamed('edad','nueva_edad').show()

+----------+------+
|nueva_edad|nombre|
+----------+------+
|      NULL|Miguel|
|        25|Carlos|
|        19|  Juan|
+----------+------+



### Utilizar el lenguaje sql para realizar una consulta

In [25]:
df.createOrReplaceTempView('personas')

In [26]:
query = spark.sql('select * from personas')

In [27]:
query.show()

+----+------+
|edad|nombre|
+----+------+
|NULL|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



In [28]:
mayor_veinte = spark.sql('select * from personas where edad > 20')

In [29]:
mayor_veinte.show()

+----+------+
|edad|nombre|
+----+------+
|  25|Carlos|
+----+------+



### filter where

In [30]:
df = spark.read.csv('AAPL.csv', inferSchema = True, header = True)

In [31]:
df.show()

+----------+---------+---------+---------+---------+---------+---------+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|
+----------+---------+---------+---------+---------+---------+---------+
|2010-01-04|    30.49|30.642857|    30.34|30.572857|26.466835|123432400|
|2010-01-05|30.657143|30.798571|30.464285|30.625713|26.512596|150476200|
|2010-01-06|30.625713|30.747143|30.107143|30.138571|26.090879|138040000|
|2010-01-07|    30.25|30.285715|29.864286|30.082857|26.042646|119282800|
|2010-01-08|30.042856|30.285715|29.865715|30.282858|26.215786|111902700|
|2010-01-11|     30.4|30.428572|29.778572|30.015715|25.984528|115557400|
|2010-01-12|29.884285|29.967142|29.488571|29.674286|25.688946|148614900|
|2010-01-13|29.695715|30.132856|29.157143|30.092857|26.051304|151473000|
|2010-01-14|30.015715|30.065714|29.860001|29.918571|25.900436|108223500|
|2010-01-15|30.132856|30.228571|    29.41|29.418571|25.467583|148516900|
|2010-01-19|29.761429|30.741428|29.605715|30.719999

In [32]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [33]:
df.head(3)[0]

Row(Date=datetime.date(2010, 1, 4), Open=30.49, High=30.642857, Low=30.34, Close=30.572857, Adj Close=26.466835, Volume=123432400)

In [34]:
df.filter('Close < 30').select(['Open','Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|30.015715|29.918571|
|30.132856|29.418571|
|30.297142|29.724285|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
|    27.84|28.381428|
|28.301428|28.625713|
+---------+---------+
only showing top 20 rows



In [35]:
df.filter(df['Close'] < 30).select(['Open','Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|30.015715|29.918571|
|30.132856|29.418571|
|30.297142|29.724285|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
|    27.84|28.381428|
|28.301428|28.625713|
+---------+---------+
only showing top 20 rows



In [36]:
df.where((df['Close'] < 30) | (df['Open'] < 30)).select(['Open','Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|29.695715|30.092857|
|30.015715|29.918571|
|30.132856|29.418571|
|29.761429|30.719999|
|30.297142|29.724285|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
+---------+---------+
only showing top 20 rows



In [37]:
df.where((df['Close'] < 30) & (df['Open'] < 30)).select(['Open','Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
|    27.84|28.381428|
|28.301428|28.625713|
|28.848572|29.057142|
|    29.17|28.935715|
|28.804285|    28.99|
+---------+---------+
only showing top 20 rows



In [38]:
df.where((df['Close'] < 30) & ~(df['Open'] < 30)).select(['Open','Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|30.015715|29.918571|
|30.132856|29.418571|
|30.297142|29.724285|
+---------+---------+



In [39]:
df.filter(df['Low'] == 30.34).show()

+----------+-----+---------+-----+---------+---------+---------+
|      Date| Open|     High|  Low|    Close|Adj Close|   Volume|
+----------+-----+---------+-----+---------+---------+---------+
|2010-01-04|30.49|30.642857|30.34|30.572857|26.466835|123432400|
+----------+-----+---------+-----+---------+---------+---------+



In [40]:
resultado = df.filter(df['Low'] == 30.34).collect()

In [41]:
resultado[0]

Row(Date=datetime.date(2010, 1, 4), Open=30.49, High=30.642857, Low=30.34, Close=30.572857, Adj Close=26.466835, Volume=123432400)

In [42]:
fila = resultado[0]

In [43]:
fila.asDict()['Volume']

123432400

### agrupaciones

In [46]:
df = spark.read.csv('ventas.csv', inferSchema = True, header = True)

In [47]:
df.show()

+-------+-------+------+
|Empresa|Persona|Ventas|
+-------+-------+------+
|   GOOG| Carlos|   200|
|   GOOG|   Juan|   120|
|   GOOG| Felipe|   340|
|   MSFT|   Tina|   600|
|   MSFT| Andrea|   124|
|   MSFT|  Carla|   243|
|     FB|   Sara|   870|
|     FB|Ignacio|   350|
|   APPL| Miguel|   250|
|   APPL|  Oscar|   130|
|   APPL|  Jorge|   750|
|   APPL|   Ivan|   350|
+-------+-------+------+



In [48]:
df.printSchema()

root
 |-- Empresa: string (nullable = true)
 |-- Persona: string (nullable = true)
 |-- Ventas: integer (nullable = true)



### groupBy

In [49]:
df.groupBy('Empresa').sum().show()

+-------+-----------+
|Empresa|sum(Ventas)|
+-------+-----------+
|   APPL|       1480|
|   GOOG|        660|
|     FB|       1220|
|   MSFT|        967|
+-------+-----------+

