En el siguiente notebook vamos a trabajar con PySpark. En él, vamos a usar las transformaciones y las acciones para analizar un pequeño conjunto de datos.

El primer paso que debemos dar es conectar Google Colab con nuestro Google Drive. Para ello, lanzaremos el siguiente trozo de código.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!ls

drive  sample_data


Una vez hemos conectado Google Drive, pasamos a instalar Apache Spark en nuestro notebook. En concreto, vamos a trabajar con la versión 3.5.0.

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
!wget -q http://apache.mirrors.pair.com/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [5]:
!ls


drive  sample_data  spark-3.5.0-bin-hadoop3.tgz


In [6]:
!tar xf spark-3.5.0-bin-hadoop3.tgz

Además de la instalación habitual, hay un paso más que debemos dar, y es la instalación de la librería *findspark*. Esta librería nos permitirá encontrar la instalación de Apache Spark en nuestro sistema.

Establecemos también un par de variables de entorno.

In [7]:
!pip install -q findspark

In [8]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [9]:
import findspark
findspark.init()

In [10]:
findspark.find()

'/content/spark-3.5.0-bin-hadoop3'

Una vez tenemos instalado nuestro framework, es momento de crear nuestra variable SparkSession, de la cual parten la mayoría de funcionalidades de SparkSQL

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Pyspark_SQL")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [12]:
spark

### Lectura de datos

Ahora sí, podemos comenzar a trabajar con SparkSQL. Comenzaremos leyendo nuestro conjunto de datos, el cual contiene información de valoraciones de restaurantes.

In [13]:
!pwd

/content


In [14]:
import pandas as pd

data_path = '/content/drive/MyDrive/TokioSchool/'

data = spark.read.options(inferSchema='True',delimiter=',', header=True).csv (data_path + 'CSV_stocks_2021.csv')


In [15]:
data

DataFrame[ticker: string, open: double, high: double, low: double, close: double, volume: int, dividends: double, stock splits: int, date: date, ccy: string]

### Análisis estadísico

In [16]:
data.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- dividends: double (nullable = true)
 |-- stock splits: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- ccy: string (nullable = true)



# Ejercicio: Aprendizaje supervisado

Obtención de datos

*   a) Filtraremos las filas con valor ‘BA’ en la columna ‘ticker’


In [17]:
# Filtraremos las filas que correspondan con el valor ‘HON’ de la columna ‘ticker’
data.filter(data.ticker == 'BA').show()


+------+------------------+------------------+------------------+------------------+--------+---------+------------+----------+---+
|ticker|              open|              high|               low|             close|  volume|dividends|stock splits|      date|ccy|
+------+------------------+------------------+------------------+------------------+--------+---------+------------+----------+---+
|    BA|216.24000549316406|216.89999389648438| 212.6999969482422|214.05999755859375|10487600|      0.0|           0|2020-12-31|USD|
|    BA|             210.0| 210.1999969482422|202.49000549316406|202.72000122070312|21225600|      0.0|           0|2021-01-04|USD|
|    BA|204.74000549316406|213.35000610351562|204.60000610351562| 211.6300048828125|19338300|      0.0|           0|2021-01-05|USD|
|    BA|210.22000122070312|215.61000061035156|209.33999633789062|211.02999877929688|16202200|      0.0|           0|2021-01-06|USD|
|    BA|213.38999938964844|216.60000610351562|211.77999877929688| 212.710006

*   b) Importaremos los objetos DecisionTreeClassifier y VectorAssembler del módulo PySpark.ml.

In [18]:
# Importamos decisiontreeclassifier
from sklearn.tree import DecisionTreeClassifier

In [19]:
# Importamos vectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

c) Modificaremos el tipo de datos de las columnas open, low y close a float.

In [20]:
data2 = data.withColumn("Open",
                                data["Open"]
                                  .cast('float'))
data2.printSchema()

data3 = data2.withColumn("low",
                                data["low"]
                                  .cast('float'))
data3.printSchema()
data4 = data3.withColumn("close",
                                data["close"]
                                  .cast('float'))
data4.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- dividends: double (nullable = true)
 |-- stock splits: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- ccy: string (nullable = true)

root
 |-- ticker: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- high: double (nullable = true)
 |-- low: float (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- dividends: double (nullable = true)
 |-- stock splits: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- ccy: string (nullable = true)

root
 |-- ticker: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- high: double (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- volume: integer (nullable = true)
 |-- dividends: 

d) Importamos los objetos DecisionTreeClassifier y VectorAssembler del módulo PySpark.ml
Crearemos una columna ‘features’ utilizando las columnas open, low y close mediante VectorAssembler.

In [21]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler


In [22]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

In [23]:
df = data.withColumn("open", col("open").cast(FloatType()))
df = data.withColumn("low", col("low").cast(FloatType()))
df = data.withColumn("close", col("close").cast(FloatType()))

In [24]:
assembler = VectorAssembler(inputCols=["open", "low", "close"], outputCol="features")
assembler_df = assembler.transform(df)



e) Convertiremos la columna ‘ticker’ a tipo numérico, utilizando StringIndexer y llamarla ‘label’.


In [25]:
# Importamos StringIndexer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline



In [26]:
# # Convertiremos la columna ‘ticker’ a tipo numérico, utilizando StringIndexer y llamarla ‘label’.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="ticker", outputCol="label")
df_with_label = indexer.fit(assembler_df).transform(assembler_df)
df_with_label.show()

+------+------------------+------------------+------------------+---------+-------+---------+------------+----------+---+--------------------+-----+
|ticker|              open|              high|               low|    close| volume|dividends|stock splits|      date|ccy|            features|label|
+------+------------------+------------------+------------------+---------+-------+---------+------------+----------+---+--------------------+-----+
|   HON|207.45501408335315|209.42141232111007|206.14735745495346|209.12645|1406400|      0.0|           0|2020-12-31|USD|[207.455014083353...| 13.0|
|   HON|209.26409612607986|209.43123817541044| 202.8929700588177|204.45625|2328900|      0.0|           0|2021-01-04|USD|[209.264096126079...| 13.0|
|   HON|203.50255708426138| 206.6586329757187|203.50255708426138| 204.9577|2172100|      0.0|           0|2021-01-05|USD|[203.502557084261...| 13.0|
|   HON|205.93106042557045|210.38495140642195|205.71475541084416|208.69385|2747900|      0.0|           0|

# Ejercicio: Aprendizaje supervisado

a) Crearemos el objeto DecisionTreeClassifier



In [27]:
# Importamos decisiontreeclassifier y numpy
from sklearn.tree import DecisionTreeClassifier
import numpy as np
from sklearn.model_selection import train_test_split
from pyspark.ml.feature import IndexToString

In [28]:
# Entrenamos el modelo
(train, test) = df_with_label.randomSplit([0.2, 0.8])

In [29]:
# Crear el objeto IndexToString
index_to_string = IndexToString(inputCol="label", outputCol="original_ticker")

# Transformar las predicciones para obtener los valores originales de 'ticker'
df_with_original_ticker = index_to_string.transform(df_with_label)



In [None]:
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc = dtc.fit(train)

In [None]:
#Uniremos mediante un inner join, el dataframe con las predicciones y el que contiene los valores reestablecidos de la columna ‘ticker’.
pred = dtc.transform(test)
finalResult = pred.join(df_with_original_ticker, on="ticker", how="inner")
