# Ejercicios Spark DataFrames

Vamos a practicar un poco con tus nuevas habilidades de Spark DataFrame, se te harán algunas preguntas básicas sobre algunos datos del mercado de valores, en este caso Walmart Stock de los años 2012-2017.

Responde a las preguntas y completa las tareas de abajo.

#### ¡Utiliza el archivo walmart_stock.csv para responder y completar las tareas siguientes!

#### Iniciar una sesión de Spark

In [None]:
!pip install pyspark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()



#### Cargar el archivo CSV de Walmart Stock, hacer que Spark infiera los tipos

1.   Elemento de lista
2.   Elemento de lista

de datos.

In [None]:
#Importamos el drive
from google.colab import drive
drive.mount('/content/drive')
#Ruta del archivo CSV en google drive
csv_path = '/content/drive/MyDrive/Colab Notebooks/walmart_stock.csv'
#Cargamos los datos en un dataFrame de Spark con inferencia de esquema
df = spark.read.csv(csv_path, header=True, inferSchema=True)
#Muestro las 5 primeras filas para ver que funciona
df.show(5)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+

¿Cuáles son los nombres de las columnas?

In [None]:
column_names = df.columns
print(column_names)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


#### ¿Qué aspecto tiene el esquema?

In [None]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### Imprime las 5 primeras columnas.

In [None]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



#### Utiliza describe() para conocer el DataFrame.

In [None]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### Hay demasiados decimales para la media y el stddev en describe(). Formatea los números para que sólo se muestren hasta dos decimales. Presta atención a los tipos de datos que devuelve .describe()

In [None]:
#obtener el resumen estadistico
sumary_df = df.describe()
#convertir el DataFrame de Pyspark a Pandas para la manipulacion
pandas_sumary_df = sumary_df.toPandas()
#Formatear los numeros a dos decimales
numeric_stats = pandas_sumary_df[pandas_sumary_df['summary'].isin(['mean', 'stddev'])]
#Formatear las columnas numericas a dos decimales
for colum in numeric_stats.columns[1:]:
  pandas_sumary_df[colum] = pandas_sumary_df[colum].apply(lambda x: f"{x:.2f}" if isinstance(x, (int, float)) else x)
print(pandas_sumary_df)

  summary                Open               High                Low  \
0   count                1258               1258               1258   
1    mean   72.35785375357709  72.83938807631165   71.9186009594594   
2  stddev    6.76809024470826  6.768186808159218  6.744075756255496   
3     min  56.389998999999996          57.060001          56.299999   
4     max           90.800003          90.970001              89.25   

               Close             Volume          Adj Close  
0               1258               1258               1258  
1  72.38844998012726  8222093.481717011  67.23883848728146  
2  6.756859163732991    4519780.8431556  6.722609449996857  
3          56.419998            2094900          50.363689  
4          90.470001           80898100  84.91421600000001  


#### Crea un nuevo dataframe con una columna llamada HV Ratio que es la relación entre el precio máximo y el volumen de las acciones negociadas durante un día.

In [None]:
df_hv_ratio = df.withColumn("HV Ratio", df["High"] / df["Volume"])
df_hv_ratio.select("HV Ratio").show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



#### ¿Qué día hubo el pico máximo en el precio?

In [None]:
max_high_day = df.orderBy(df["High"].desc()).first()
print(max_high_day["Date"])

2015-01-13


#### ¿Cuál es la media de la columna Close?

In [None]:
mean_close = df.agg({"Close": "avg"}).collect()[0][0]
print(mean_close)

72.38844998012726


#### ¿Cuál es el máximo y el mínimo de la columna Volumen?

In [None]:
import pyspark.sql.functions as F
#calculamos el maximo y minimo de la columna Volume
volume_stats = df.agg(
    F.max('Volume'),
    F.min('Volume')
).collect()[0]
#Obtenemos los valores del resultado
max_volume = volume_stats[0]
min_volume = volume_stats[1]
#Mostramos los resultados
print(f"Max Volume: {max_volume}")
print(f"Min Volume: {min_volume}")

Max Volume: 80898100
Min Volume: 2094900


+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



#### ¿Cuántos días estuvo el cierre por debajo de los 60 dólares?

In [None]:
days_below_60 = df.filter(df["Close"] < 60).count()
print(days_below_60)

81


#### ¿Qué porcentaje de veces el Máximo fue superior a 80 dólares?
#### En otras palabras, (Número de días de máximos>80)/(Días totales en el conjunto de datos)

In [None]:
days_high_80 = df.filter(df["High"] > 80).count()
total_days = df.count()
percentage_high_80 = (days_high_80 / total_days) * 100
print(percentage_high_80)

9.141494435612083


#### ¿Cuál es la correlación de Pearson entre High y Volume?

In [None]:
correlation = df.corr("High", "Volume")
print(correlation)

-0.3384326061737161


#### ¿Cuál es el valor máximo de High por año?

In [None]:
df_with_year = df.withColumn("Year", F.year("Date"))
max_high_per_year = df_with_year.groupBy("Year").agg(F.max("High").alias("MaxHigh"))
max_high_per_year.show()

+----+---------+
|Year|  MaxHigh|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



#### ¿Cuál es el cierre medio de cada mes del calendario?
#### En otras palabras, a lo largo de todos los años, ¿cuál es el precio medio de cierre para enero, febrero, marzo, etc.? Su resultado tendrá un valor para cada uno de estos meses.

In [None]:
import pyspark.sql.functions as F
#Convertimos la Columna Date
df = df.withColumn('Date', df['Date'].cast('date'))
#Extraemos el mes de la Columna Date
df_with_month = df.withColumn("Month", F.month("Date"))
#Agrupamos por Month y calculamos el precio medio de cierre por mes
mean_close_per_month = df_with_month.groupBy("Month").agg(
    F.avg("Close").alias("MeanClose"))
mean_close_per_month.show()

+-----+-----------------+
|Month|        MeanClose|
+-----+-----------------+
|   12|72.84792478301885|
|    1|71.44801958415842|
|    6| 72.4953774245283|
|    3|71.77794377570092|
|    5|72.30971688679247|
|    9|72.18411785294116|
|    4|72.97361900952382|
|    8|73.02981855454546|
|    7|74.43971943925233|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|    2|  71.306804443299|
+-----+-----------------+

