# Preliminares

1. Realizar la descarga del contenedor en caso de no haberlo hecho:

* `docker pull jdvelasq/pyspark:2.4.7-pseudo`

2. Correr el contenedor en una terminal de Visual Studio Code
* `docker run --rm -it -v C:\dataspark:/datalake -p 8088:8088 jdvelasq/pyspark:2.4.7-pseudo`

3. Recordar hacer el Attach del contenedor en Visual Studio Code. Si no aparece la opción debe instalar la extensión Remote - Containers

4. Ejecutar el IDE (Integrated development environment) dentro de la terminal del contenedor
* `jupyter lab --ip=0.0.0.0`

* También puede usar el editor de Visual Studio Code

* O usar la consola interactiva de pyspark: `pyspark --jars /datalake/flint-0.6.0.jar --py-files /datalake/flint-0.6.0.jar`

# Instalación de las librerias para trabajar Series de Tiempo en pySpark

In [1]:
# Es una libreria que da soporte para llamar funciones, clases y tipos del Lenguaje C
# Es un requerimiento para ts.flint
!pip3 install Cython

# Es una libreria especial para la serialización de arreglos basada en C++, permite la 
# integración entre pandas, NumPy, Spark ... 
# https://pypi.org/project/pyarrow/
# Es un requerimiento para ts.flint
!pip3 install pyarrow==0.9.0

# Es una colección de módulos para el análisis de series de tiempo con PySpark
# https://ts-flint.readthedocs.io/en/latest/
!pip3 install ts-flint

# Descargamos las librerias java de flint
# quedará en la carpeta /datalake
!wget https://repo1.maven.org/maven2/com/twosigma/flint/0.6.0/flint-0.6.0.jar

--2021-04-24 02:28:56--  https://repo1.maven.org/maven2/com/twosigma/flint/0.6.0/flint-0.6.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.4.209, 199.232.32.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.4.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2171677 (2.1M) [application/java-archive]
Saving to: 'flint-0.6.0.jar.4'


2021-04-24 02:28:57 (3.95 MB/s) - 'flint-0.6.0.jar.4' saved [2171677/2171677]



# Implementación

In [2]:
import os
# Requerido https://github.com/twosigma/flint/blob/master/python/README.md
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /datalake/flint-0.6.0.jar --py-files /datalake/flint-0.6.0.jar pyspark-shell'

# La libreria para "encontrar el sevicio" de Spark
import findspark
findspark.init()

# Librerias para "gestionar el servicio" de Spark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession

# Creamos una aplicación Spark en el Servicio
# Tenga cuidado con las tildes o caracteres especiales en el nombre de la app
AppSpark = SparkConf().setAppName("Valores y Acciones SA")

# definimos un espacio o contexto para la App
ContextoSpark=SparkContext(conf=AppSpark)

# inicio una sesión en el espacio de la App
SesionSpark = SparkSession(ContextoSpark)

# inicio del espacio o contexto SQL
ContextoSql = SQLContext(sparkContext=ContextoSpark, sparkSession=SesionSpark)

In [4]:
# Consultamos las tablas en SQL Spark
ContextoSql.sql("SHOW TABLES").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [5]:
# Ahora cargamos los datos en el file system (FS) de hadoop
# recordar que el archivo local esta en la carpeta /datalake

!hadoop fs -put sp500.csv
!hadoop fs -ls

put: `sp500.csv': File exists
Found 1 items
-rw-r--r--   1 root supergroup    1246208 2021-04-24 02:00 sp500.csv


In [6]:
# Creamos una tabla a partir de datos CSV
# Se toma el CSV desde el FS de Hadoop
#https://www.learningjournal.guru/courses/spark/spark-foundation-training/spark-sql-database-and-table/
#https://spark.apache.org/docs/latest/sql-ref-syntax-ddl.html

ContextoSql.sql("""
CREATE TABLE 
    sp500_crudo 
USING com.databricks.spark.csv 
OPTIONS (
    path 'sp500.csv', 
    header 'true')
""");

In [7]:
ContextoSql.sql("SHOW TABLES").show()

+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
| default|sp500_crudo|      false|
+--------+-----------+-----------+



In [8]:
consulta = ContextoSql.sql("Select * from sp500_crudo")
consulta.show(10)

+---+-------------------+-----+-----+-----+-----+---------+-------+
| id|               Date| Open| High|  Low|Close|Adj Close| Volume|
+---+-------------------+-----+-----+-----+-----+---------+-------+
|  1|1950-01-03 00:00:00|16.66|16.66|16.66|16.66|    16.66|1260000|
|  2|1950-01-04 00:00:00|16.85|16.85|16.85|16.85|    16.85|1890000|
|  3|1950-01-05 00:00:00|16.93|16.93|16.93|16.93|    16.93|2550000|
|  4|1950-01-06 00:00:00|16.98|16.98|16.98|16.98|    16.98|2010000|
|  5|1950-01-09 00:00:00|17.08|17.08|17.08|17.08|    17.08|2520000|
|  6|1950-01-10 00:00:00|17.03|17.03|17.03|17.03|    17.03|2160000|
|  7|1950-01-11 00:00:00|17.09|17.09|17.09|17.09|    17.09|2630000|
|  8|1950-01-12 00:00:00|16.76|16.76|16.76|16.76|    16.76|2970000|
|  9|1950-01-13 00:00:00|16.67|16.67|16.67|16.67|    16.67|3330000|
| 10|1950-01-16 00:00:00|16.72|16.72|16.72|16.72|    16.72|1460000|
+---+-------------------+-----+-----+-----+-----+---------+-------+
only showing top 10 rows



In [9]:
consulta = ContextoSql.sql("Select date, close from sp500_crudo")
consulta.show(10)
print("Tipo de dato:", type(consulta))
consulta.printSchema()

+-------------------+-----+
|               date|close|
+-------------------+-----+
|1950-01-03 00:00:00|16.66|
|1950-01-04 00:00:00|16.85|
|1950-01-05 00:00:00|16.93|
|1950-01-06 00:00:00|16.98|
|1950-01-09 00:00:00|17.08|
|1950-01-10 00:00:00|17.03|
|1950-01-11 00:00:00|17.09|
|1950-01-12 00:00:00|16.76|
|1950-01-13 00:00:00|16.67|
|1950-01-16 00:00:00|16.72|
+-------------------+-----+
only showing top 10 rows

Tipo de dato: <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- date: string (nullable = true)
 |-- close: string (nullable = true)



In [10]:
consulta = ContextoSql.sql("describe sp500_crudo")
consulta.show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|       id|   string|   null|
|     Date|   string|   null|
|     Open|   string|   null|
|     High|   string|   null|
|      Low|   string|   null|
|    Close|   string|   null|
|Adj Close|   string|   null|
|   Volume|   string|   null|
+---------+---------+-------+



In [11]:
dfSp500Crudo = ContextoSql.table('sp500_crudo')

#Mostramos el tipo de dato
print("Tipo de variable", type(dfSp500Crudo))

#Mostramos la estructura
dfSp500Crudo.printSchema()

Tipo de variable <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- id: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [12]:
# Cargar los datos con sus correspondientes tipos
# Recordar que el CSV debe estar en el File System de Hadoop
ContextoSql.sql("""
CREATE TABLE IF NOT EXISTS 
sp500 (
        id LONG,
        date TIMESTAMP,
        open FLOAT,
        high FLOAT,
        low FLOAT,
        close FLOAT,
        adj_close FLOAT,
        volume BIGINT
        ) 
USING CSV OPTIONS ( 
    header='true', 
    nullvalue='NA', 
    timestampFormat=\"yyyy-MM-dd'T'HH:mm:ss\", 
    path='sp500.csv')
    """); 

In [13]:
ContextoSql.sql("SHOW TABLES").show()

consulta = ContextoSql.sql("Select * from sp500_crudo")
consulta.show(10)

ContextoSql.sql("describe sp500").show()

+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
| default|      sp500|      false|
| default|sp500_crudo|      false|
+--------+-----------+-----------+

+---+-------------------+-----+-----+-----+-----+---------+-------+
| id|               Date| Open| High|  Low|Close|Adj Close| Volume|
+---+-------------------+-----+-----+-----+-----+---------+-------+
|  1|1950-01-03 00:00:00|16.66|16.66|16.66|16.66|    16.66|1260000|
|  2|1950-01-04 00:00:00|16.85|16.85|16.85|16.85|    16.85|1890000|
|  3|1950-01-05 00:00:00|16.93|16.93|16.93|16.93|    16.93|2550000|
|  4|1950-01-06 00:00:00|16.98|16.98|16.98|16.98|    16.98|2010000|
|  5|1950-01-09 00:00:00|17.08|17.08|17.08|17.08|    17.08|2520000|
|  6|1950-01-10 00:00:00|17.03|17.03|17.03|17.03|    17.03|2160000|
|  7|1950-01-11 00:00:00|17.09|17.09|17.09|17.09|    17.09|2630000|
|  8|1950-01-12 00:00:00|16.76|16.76|16.76|16.76|    16.76|2970000|
|  9|1950-01-13 00:00:00|16.67|16.67|16.6

In [14]:
consulta = ContextoSql.sql("Select * from sp500_crudo ORDER By date DESC")
consulta.show(10)

+-----+-------------------+-------+-------+-------+-------+---------+----------+
|   id|               Date|   Open|   High|    Low|  Close|Adj Close|    Volume|
+-----+-------------------+-------+-------+-------+-------+---------+----------+
|17356|2018-12-21 00:00:00|2465.38|2504.41|2408.55|2416.62|  2416.62|7609010000|
|17355|2018-12-20 00:00:00|2496.77|2509.63|2441.18|2467.42|  2467.42|5585780000|
|17354|2018-12-19 00:00:00|2547.05|2585.29|2488.96|2506.96|  2506.96|5127940000|
|17353|2018-12-18 00:00:00| 2559.9|2573.99|2528.71|2546.16|  2546.16|4470880000|
|17352|2018-12-17 00:00:00|2590.75|2601.13|2530.54|2545.94|  2545.94|4616350000|
|17351|2018-12-14 00:00:00|2629.68|2635.07|2593.84|2599.95|  2599.95|4035020000|
|17350|2018-12-13 00:00:00| 2658.7|2670.19|2637.27|2650.54|  2650.54|3927720000|
|17349|2018-12-12 00:00:00|2658.23|2685.44|2650.26|2651.07|  2651.07|3955890000|
|17348|2018-12-11 00:00:00|2664.44|2674.35| 2621.3|2636.78|  2636.78|3905870000|
|17347|2018-12-10 00:00:00|2

In [15]:
consulta = ContextoSql.sql("Select * from sp500_crudo where open = close ORDER By date DESC")
consulta.show(10)
print("Se encontraron: ", consulta.count(), " registros.")

+-----+-------------------+-------+-------+-------+-------+---------+----------+
|   id|               Date|   Open|   High|    Low|  Close|Adj Close|    Volume|
+-----+-------------------+-------+-------+-------+-------+---------+----------+
|16927|2017-04-10 00:00:00|2357.16|2366.37| 2351.5|2357.16|  2357.16|2785410000|
|15292|2010-10-11 00:00:00|1165.32|1168.68|1162.02|1165.32|  1165.32|2505900000|
|14207|2006-06-20 00:00:00|1240.12|1249.01|1238.87|1240.12|  1240.12|2232950000|
|11844|1997-01-28 00:00:00| 765.02| 776.32| 761.75| 765.02|   765.02| 541580000|
|11327|1995-01-12 00:00:00| 461.64| 461.93| 460.63| 461.64|   461.64| 313040000|
|11142|1994-04-19 00:00:00| 442.54| 444.82| 438.83| 442.54|   442.54| 323280000|
|10732|1992-09-03 00:00:00| 417.98| 420.31| 417.49| 417.98|   417.98| 212500000|
| 9784|1988-12-05 00:00:00| 274.93| 275.62| 271.81| 274.93|   274.93| 144660000|
| 9280|1986-12-08 00:00:00| 251.16| 252.36| 248.82| 251.16|   251.16| 159000000|
| 9229|1986-09-25 00:00:00| 

In [16]:
consulta = ContextoSql.sql("Select * from sp500_crudo where date > '2017-00-00'")
consulta.show(10)
print("Se encontraron: ", consulta.count(), " registros.")

+-----+-------------------+-------+-------+-------+-------+---------+----------+
|   id|               Date|   Open|   High|    Low|  Close|Adj Close|    Volume|
+-----+-------------------+-------+-------+-------+-------+---------+----------+
|16860|2017-01-03 00:00:00|2251.57|2263.88|2245.13|2257.83|  2257.83|3770530000|
|16861|2017-01-04 00:00:00| 2261.6|2272.82| 2261.6|2270.75|  2270.75|3764890000|
|16862|2017-01-05 00:00:00|2268.18| 2271.5|2260.45|   2269|     2269|3761820000|
|16863|2017-01-06 00:00:00|2271.14| 2282.1|2264.06|2276.98|  2276.98|3339890000|
|16864|2017-01-09 00:00:00|2273.59|2275.49| 2268.9| 2268.9|   2268.9|3217610000|
|16865|2017-01-10 00:00:00|2269.72|2279.27|2265.27| 2268.9|   2268.9|3638790000|
|16866|2017-01-11 00:00:00| 2268.6|2275.32|2260.83|2275.32|  2275.32|3620410000|
|16867|2017-01-12 00:00:00|2271.14|2271.78|2254.25|2270.44|  2270.44|3462130000|
|16868|2017-01-13 00:00:00|2272.74|2278.68|2271.51|2274.64|  2274.64|3081270000|
|16869|2017-01-17 00:00:00|2

In [17]:
consulta = ContextoSql.sql("Select * from sp500_crudo where date > '2017-00-00' and date < '2018-00-00'")
consulta.show(10)
print("Se encontraron: ", consulta.count(), " registros.")

+-----+-------------------+-------+-------+-------+-------+---------+----------+
|   id|               Date|   Open|   High|    Low|  Close|Adj Close|    Volume|
+-----+-------------------+-------+-------+-------+-------+---------+----------+
|16860|2017-01-03 00:00:00|2251.57|2263.88|2245.13|2257.83|  2257.83|3770530000|
|16861|2017-01-04 00:00:00| 2261.6|2272.82| 2261.6|2270.75|  2270.75|3764890000|
|16862|2017-01-05 00:00:00|2268.18| 2271.5|2260.45|   2269|     2269|3761820000|
|16863|2017-01-06 00:00:00|2271.14| 2282.1|2264.06|2276.98|  2276.98|3339890000|
|16864|2017-01-09 00:00:00|2273.59|2275.49| 2268.9| 2268.9|   2268.9|3217610000|
|16865|2017-01-10 00:00:00|2269.72|2279.27|2265.27| 2268.9|   2268.9|3638790000|
|16866|2017-01-11 00:00:00| 2268.6|2275.32|2260.83|2275.32|  2275.32|3620410000|
|16867|2017-01-12 00:00:00|2271.14|2271.78|2254.25|2270.44|  2270.44|3462130000|
|16868|2017-01-13 00:00:00|2272.74|2278.68|2271.51|2274.64|  2274.64|3081270000|
|16869|2017-01-17 00:00:00|2

In [18]:
miTabla = ContextoSql.table('sp500')

consulta = miTabla.select(['id', 'close']).show(5)

consulta = miTabla.select(['id', 'date', 'open']).filter(miTabla['open'] > 2500).show(5)

print(miTabla.head(3))

+---+-----+
| id|close|
+---+-----+
|  1|16.66|
|  2|16.85|
|  3|16.93|
|  4|16.98|
|  5|17.08|
+---+-----+
only showing top 5 rows

+-----+-------------------+-------+
|   id|               date|   open|
+-----+-------------------+-------+
|17038|2017-09-18 00:00:00|2502.51|
|17039|2017-09-19 00:00:00|2506.29|
|17040|2017-09-20 00:00:00|2506.84|
|17041|2017-09-21 00:00:00|2507.16|
|17044|2017-09-26 00:00:00|2501.04|
+-----+-------------------+-------+
only showing top 5 rows

[Row(id=1, date=datetime.datetime(1950, 1, 3, 0, 0), open=16.65999984741211, high=16.65999984741211, low=16.65999984741211, close=16.65999984741211, adj_close=16.65999984741211, volume=1260000), Row(id=2, date=datetime.datetime(1950, 1, 4, 0, 0), open=16.850000381469727, high=16.850000381469727, low=16.850000381469727, close=16.850000381469727, adj_close=16.850000381469727, volume=1890000), Row(id=3, date=datetime.datetime(1950, 1, 5, 0, 0), open=16.93000030517578, high=16.93000030517578, low=16.93000030517578, c

In [19]:
#agrupaciones 

from pyspark.sql import functions as F

miTabla.agg(F.avg('open').alias('promOpen'), F.avg('close').alias('promClose')).show()
                                                                  
miTabla.agg(F.max('open').alias('maxOpen'), F.max('close').alias('maxClose')).show()

miTabla.agg(F.min('open').alias('minOpen'), F.min('close').alias('minClose')).show()


+----------------+-----------------+
|        promOpen|        promClose|
+----------------+-----------------+
|568.127855878295|568.2315907784941|
+----------------+-----------------+

+-------+--------+
|maxOpen|maxClose|
+-------+--------+
|2936.76| 2930.75|
+-------+--------+

+-------+--------+
|minOpen|minClose|
+-------+--------+
|  16.66|   16.66|
+-------+--------+



In [20]:
mi2017 = miTabla.filter((miTabla['date'] > '2017-00-00') & (miTabla['date'] < '2018-00-00'))

mi2017.agg(F.max('open').alias('maxOpen'), F.max('close').alias('maxClose')).show()

+-------+--------+
|maxOpen|maxClose|
+-------+--------+
|2692.71| 2690.16|
+-------+--------+



In [37]:
# Group by por año

precioPromAnual = ContextoSql.sql(""" Select year(date) as year, avg(open) as avgOpen, round(avg(close)) as avgClose,
                                      max(open) as mxOpen
                                  from sp500 group by year(date) 
                                  order by year(date)""")
precioPromAnual.show(10)
print("Se encontraron: ", precioPromAnual.count(), " registros.")

print(type(precioPromAnual))

+----+------------------+--------+------+
|year|           avgOpen|avgClose|mxOpen|
+----+------------------+--------+------+
|1950|18.397269145551935|    18.0| 20.43|
|1951|22.321887594629004|    22.0| 23.85|
|1952|24.496160057067872|    24.0| 26.59|
|1953|24.722589675173815|    25.0| 26.66|
|1954|29.724087359413268|    30.0| 35.98|
|1955| 40.49884920271616|    40.0| 46.41|
|1956|46.639521959768345|    47.0| 49.64|
|1957| 44.42337299528576|    44.0| 49.13|
|1958|46.203452367631215|    46.0| 55.21|
|1959| 57.41818169454341|    57.0| 60.71|
+----+------------------+--------+------+
only showing top 10 rows

Se encontraron:  69  registros.
<class 'pyspark.sql.dataframe.DataFrame'>


In [50]:
# limpia en caso de haberlo creado
#!hadoop fs -rm precioPromAnual_csv/* 
#!hadoop fs -rmdir precioPromAnual_csv

precioPromAnual.write.csv('precioPromAnual_csv', header=True)
# Más opciones en: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.csv.html

In [51]:
# Se consulta el catalogo del resultado, se consolida y almacena
!hadoop fs -cat 'precioPromAnual_csv/*' > precioPromAnual.csv

In [55]:
#Almacenar una consulta en una tabla
precioPromAnual.write.format("orc").saveAsTable("preciosPromediosAnuales")
ContextoSql.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|preciospromediosa...|      false|
| default|               sp500|      false|
| default|         sp500_crudo|      false|
+--------+--------------------+-----------+



In [65]:
#Uso de la tabla creada
promedios = ContextoSql.table('preciosPromediosAnuales')
promedios.orderBy(F.col("year").desc()).show(10)
promedios.orderBy(F.col("year").asc()).show(10)


+----+------------------+--------+-------+
|year|           avgOpen|avgClose| mxOpen|
+----+------------------+--------+-------+
|2018|2754.1532508415903|  2752.0|2936.76|
|2017| 2448.275887660296|  2449.0|2692.71|
|2016| 2094.091549827939|  2095.0|2270.54|
|2015|2061.2680160280256|  2061.0|2130.36|
|2014|1930.7544851151724|  1931.0|2088.49|
|2013|1642.2986488947793|  1644.0|1842.97|
|2012|1378.6806381835938|  1379.0|1465.42|
|2011|1267.6182105654761|  1268.0|1365.21|
|2010|1139.3697606646826|  1140.0|1259.44|
|2009| 947.0220634823754|   948.0|1128.55|
+----+------------------+--------+-------+
only showing top 10 rows

+----+------------------+--------+------+
|year|           avgOpen|avgClose|mxOpen|
+----+------------------+--------+------+
|1950|18.397269145551935|    18.0| 20.43|
|1951|22.321887594629004|    22.0| 23.85|
|1952|24.496160057067872|    24.0| 26.59|
|1953|24.722589675173815|    25.0| 26.66|
|1954|29.724087359413268|    30.0| 35.98|
|1955| 40.49884920271616|    40.0| 4

In [25]:
# Ahora usaremos la librería ts.flint
import ts.flint
from ts.flint import FlintContext

# Creamos un espacio o contexto de Flint
# Indicamos a Flint Cual es la Sesión de Spark
#
ContextoFlint = FlintContext(ContextoSql)

# Usamos el contexto de la libreria Flint
tSerie = ContextoFlint.read.dataframe(ContextoSql.table('sp500').withColumnRenamed('Date', 'time'))

print(type(tSerie))

tSerie_t1 = tSerie.withColumn('rendimiento', 10000 * (tSerie['Close'] - tSerie['Open']) / tSerie['Open']).select('time', 'rendimiento')
tSerie_t1.show(5)

# En algunos días el precio de cierre es igual al de apertura
# Se muestran los diferentes de cero
tSerie_t1.filter(tSerie_t1['rendimiento'] != 0).show(5)

# Más información y ejemplos en
# https://github.com/twosigma/flint

<class 'ts.flint.dataframe.TimeSeriesDataFrame'>
+-------------------+-----------+
|               time|rendimiento|
+-------------------+-----------+
|1950-01-03 00:00:00|        0.0|
|1950-01-04 00:00:00|        0.0|
|1950-01-05 00:00:00|        0.0|
|1950-01-06 00:00:00|        0.0|
|1950-01-09 00:00:00|        0.0|
+-------------------+-----------+
only showing top 5 rows

+-------------------+-------------------+
|               time|        rendimiento|
+-------------------+-------------------+
|1962-01-02 00:00:00| -82.46036756299341|
|1962-01-03 00:00:00|  23.95690123148708|
|1962-01-04 00:00:00| -68.88765225405015|
|1962-01-05 00:00:00|-138.73098974978308|
|1962-01-08 00:00:00| -77.51950895002626|
+-------------------+-------------------+
only showing top 5 rows



# Otros recomendados
* Advanced Analytics with Spark: Patterns for Learning from Data at Scale por Sandy Ryza, Uri Laserson, Sean Owen, Josh Wills
* [How to run pyspark in a Jupyter Notebook](https://www.hackdeploy.com/how-to-run-pyspark-in-a-jupyter-notebook/)
* [pyts: A Python Package for Time Series Classification] (https://hal.inria.fr/hal-02883389/document)