# Operaciones Básicas con DataFrames

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("Operaciones").getOrCreate()

In [0]:
df = spark.read.csv("dbfs:/FileStore/shared_uploads/jgamarramoreno@gmail.com/appl_stock-1.csv",
                   inferSchema=True,header=True)

In [0]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Filtrar Datos

Consultas tipo SQL

In [0]:
# Filtrar los registros con Close<200 y mostrarlos
df.filter("Close<200").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.3

In [0]:
# Filtrar los registros con Close<200 y mostrar el campo Open
df.filter("Close<200").select('Open').show()

+------------------+
|              Open|
+------------------+
|206.78000600000001|
|        204.930004|
|        201.079996|
|192.36999699999998|
|        195.909998|
|        195.169994|
|        196.730003|
|192.63000300000002|
|        195.690006|
|        196.419996|
|        195.889997|
|        194.880001|
|        199.999998|
|         92.699997|
|         94.730003|
|         94.129997|
|         94.040001|
|         92.199997|
|         91.510002|
|         92.309998|
+------------------+
only showing top 20 rows



In [0]:
df.filter("Close<200").select(['Open','Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|206.78000600000001|            197.75|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
|        195.909998|        195.859997|
|        195.169994|        199.229994|
|        196.730003|        192.050003|
|192.63000300000002|        195.460001|
|        195.690006|194.11999699999998|
|        196.419996|196.19000400000002|
|        195.889997|195.12000700000002|
|        194.880001|        198.669994|
|        199.999998|        197.059998|
|         92.699997|         93.699997|
|         94.730003|             94.25|
|         94.129997|         93.860001|
|         94.040001|         92.290001|
|         92.199997|         91.279999|
|         91.510002|         92.199997|
|         92.309998| 92.08000200000001|
+------------------+------------------+
only showing top 20 rows



Con operadores de comparación Python

In [0]:
df.filter(df["Close"] < 100).show()

+-------------------+-----------------+---------+---------+-----------------+---------+-----------------+
|               Date|             Open|     High|      Low|            Close|   Volume|        Adj Close|
+-------------------+-----------------+---------+---------+-----------------+---------+-----------------+
|2014-06-09 00:00:00|        92.699997|93.879997|    91.75|        93.699997| 75415000|        88.906324|
|2014-06-10 00:00:00|        94.730003|95.050003|    93.57|            94.25| 62777000|        89.428189|
|2014-06-11 00:00:00|        94.129997|94.760002|93.470001|        93.860001| 45681000|        89.058142|
|2014-06-12 00:00:00|        94.040001|94.120003|91.900002|        92.290001| 54749000|        87.568463|
|2014-06-13 00:00:00|        92.199997|92.440002|90.879997|        91.279999| 54525000|        86.610132|
|2014-06-16 00:00:00|        91.510002|    92.75|91.449997|        92.199997| 35561000|        87.483064|
|2014-06-17 00:00:00|        92.309998|92.6999

In [0]:
# Genera un error
df.filter(df["Close"] < 200 & df["Open"] > 200).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
[0;32m<command-2538962847791067>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# Genera un error[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mfilter[0m[0;34m([0m[0mdf[0m[0;34m[[0m[0;34m"Close"[0m[0;34m][0m [0;34m<[0m [0;36m200[0m [0;34m&[0m [0mdf[0m[0;34m[[0m[0;34m"Open"[0m[0;34m][0m [0;34m>[0m [0;36m200[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/column.py[0m in [0;36m_[0;34m(self, other)[0m
[1;32m    110[0m     [0;32mdef[0m [0m_[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mother[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    111[0m         [0mjc[0m [0;34m=[0m [0mother[0m[0;34m.[0m[0m_jc[0m [0;32mi

In [0]:
# uso de and &
df.filter((df["Close"] < 200) & (df["Open"] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [0]:
# uso de or |
df.filter((df["Close"] < 200) | (df["Open"] > 200)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [0]:
# uso de not ~
df.filter((df["Close"] < 200) & ~(df["Open"] < 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [0]:
# uso de igualdad ==
df.filter(df["Low"] == 190.250002).show()

+-------------------+----------+----------+----------+----------+---------+---------+
|               Date|      Open|      High|       Low|     Close|   Volume|Adj Close|
+-------------------+----------+----------+----------+----------+---------+---------+
|2010-01-29 00:00:00|201.079996|202.199995|190.250002|192.060003|311488100|24.883208|
+-------------------+----------+----------+----------+----------+---------+---------+



### Uso de resultados como objetos Python

In [0]:
# devuelve el resultado como un objeto Python
df.filter(df["Low"] == 190.250002).collect()

Out[21]: [Row(Date=datetime.datetime(2010, 1, 29, 0, 0), Open=201.079996, High=202.199995, Low=190.250002, Close=192.060003, Volume=311488100, Adj Close=24.883208)]

In [0]:
resultado = df.filter(df["Low"] == 190.250002).collect()

In [0]:
type(resultado)

Out[23]: list

In [0]:
type(resultado[0])

Out[24]: pyspark.sql.types.Row

In [0]:
fila = resultado[0]

In [0]:
# Devuelve un diccionario
fila.asDict()

Out[26]: {'Date': datetime.datetime(2010, 1, 29, 0, 0),
 'Open': 201.079996,
 'High': 202.199995,
 'Low': 190.250002,
 'Close': 192.060003,
 'Volume': 311488100,
 'Adj Close': 24.883208}

In [0]:
for item in resultado[0]:
    print(item)

2010-01-29 00:00:00
201.079996
202.199995
190.250002
192.060003
311488100
24.883208
