<a href="https://colab.research.google.com/github/SilvanaJ90/udemy_data_engineer/blob/main/SQL_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=460083112d6a5338f35681422910c07d3e159fd7303a96b796aee93ffdcd1062
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# **Spark SQL DataFrame**

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [4]:
df = spark.read.json('/content/drive/MyDrive/dataset/personas.json')

In [5]:
df.show()

+----+------+
|edad|nombre|
+----+------+
|NULL|Miguel|
|  25|Carlos|
|  19|  Juan|
+----+------+



In [6]:
df.printSchema()

root
 |-- edad: long (nullable = true)
 |-- nombre: string (nullable = true)



In [7]:
df.columns #columnas

['edad', 'nombre']

In [8]:
df.describe().show()

+-------+-----------------+------+
|summary|             edad|nombre|
+-------+-----------------+------+
|  count|                2|     3|
|   mean|             22.0|  NULL|
| stddev|4.242640687119285|  NULL|
|    min|               19|Carlos|
|    max|               25|Miguel|
+-------+-----------------+------+



**Schema**

In [9]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [10]:
schema = [StructField('nombre', StringType(), True),
          StructField('edad', IntegerType(), True)]

In [11]:
schema_final = StructType(fields=schema)

In [12]:
df = spark.read.json('/content/drive/MyDrive/dataset/personas.json', schema=schema_final)

In [13]:
df.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = true)



In [14]:
df.show()

+------+----+
|nombre|edad|
+------+----+
|Miguel|NULL|
|Carlos|  25|
|  Juan|  19|
+------+----+



**Select**

In [15]:
df['edad']

Column<'edad'>

In [16]:
df.select('edad').show()

+----+
|edad|
+----+
|NULL|
|  25|
|  19|
+----+



In [17]:
type(df.select('edad'))

In [18]:
df.head(2)[0]

Row(nombre='Miguel', edad=None)

In [19]:
df.select(['nombre', 'edad']).show()

+------+----+
|nombre|edad|
+------+----+
|Miguel|NULL|
|Carlos|  25|
|  Juan|  19|
+------+----+



**Crear columna nueva** con withColumn()

In [20]:
df.withColumn('nueva_columna', df['edad']+1).show()

+------+----+-------------+
|nombre|edad|nueva_columna|
+------+----+-------------+
|Miguel|NULL|         NULL|
|Carlos|  25|           26|
|  Juan|  19|           20|
+------+----+-------------+



**Cambiar el nombre de la columna con WithRename()**

In [21]:
df.withColumnRenamed('nombre', 'nombre_completo').show()

+---------------+----+
|nombre_completo|edad|
+---------------+----+
|         Miguel|NULL|
|         Carlos|  25|
|           Juan|  19|
+---------------+----+



Utilizar el lenguaje SQL para crear una consulta

In [22]:
df.createOrReplaceTempView('personas')

In [23]:
query = spark.sql(" SELECT * FROM personas")

In [24]:
query.show()

+------+----+
|nombre|edad|
+------+----+
|Miguel|NULL|
|Carlos|  25|
|  Juan|  19|
+------+----+



In [25]:
mayor_20 = spark.sql("SELECT * FROM personas WHERE edad > 20")

In [26]:
mayor_20.show()

+------+----+
|nombre|edad|
+------+----+
|Carlos|  25|
+------+----+



**Spark SQL filter() o where()**

In [27]:
from pyspark.sql import SparkSession

In [28]:
spark = SparkSession.builder.appName('filter').getOrCreate()

In [29]:
df = spark.read.csv('/content/drive/MyDrive/dataset/AAPL.csv', inferSchema=True, header=True)

In [30]:
df.show()

+----------+---------+---------+---------+---------+---------+---------+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|
+----------+---------+---------+---------+---------+---------+---------+
|2010-01-04|    30.49|30.642857|    30.34|30.572857|26.466835|123432400|
|2010-01-05|30.657143|30.798571|30.464285|30.625713|26.512596|150476200|
|2010-01-06|30.625713|30.747143|30.107143|30.138571|26.090879|138040000|
|2010-01-07|    30.25|30.285715|29.864286|30.082857|26.042646|119282800|
|2010-01-08|30.042856|30.285715|29.865715|30.282858|26.215786|111902700|
|2010-01-11|     30.4|30.428572|29.778572|30.015715|25.984528|115557400|
|2010-01-12|29.884285|29.967142|29.488571|29.674286|25.688946|148614900|
|2010-01-13|29.695715|30.132856|29.157143|30.092857|26.051304|151473000|
|2010-01-14|30.015715|30.065714|29.860001|29.918571|25.900436|108223500|
|2010-01-15|30.132856|30.228571|    29.41|29.418571|25.467583|148516900|
|2010-01-19|29.761429|30.741428|29.605715|30.719999

In [31]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [32]:
df.head(5)[0]

Row(Date=datetime.date(2010, 1, 4), Open=30.49, High=30.642857, Low=30.34, Close=30.572857, Adj Close=26.466835, Volume=123432400)

In [33]:
df.filter(df['Close'] < 30).select(['Open', 'Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|30.015715|29.918571|
|30.132856|29.418571|
|30.297142|29.724285|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
|    27.84|28.381428|
|28.301428|28.625713|
+---------+---------+
only showing top 20 rows



In [34]:
df.where((df['Close'] < 30) | (df['Open'] < 30)).select(['Open', 'Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|29.695715|30.092857|
|30.015715|29.918571|
|30.132856|29.418571|
|29.761429|30.719999|
|30.297142|29.724285|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
+---------+---------+
only showing top 20 rows



In [35]:
df.where((df['Close'] < 30) & (df['Open'] < 30)).select(['Open', 'Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|29.884285|29.674286|
|29.540001|    28.25|
|    28.93|    29.01|
|29.421429|    29.42|
|29.549999|29.697144|
|29.275715|28.469999|
|28.725714|27.437143|
|27.481428|27.818571|
|27.987143|    27.98|
|27.881428|28.461428|
|28.104286|27.435715|
|27.518572|27.922857|
|27.955715|27.731428|
|28.059999|28.027143|
|27.984285|27.874287|
|    27.84|28.381428|
|28.301428|28.625713|
|28.848572|29.057142|
|    29.17|28.935715|
|28.804285|    28.99|
+---------+---------+
only showing top 20 rows



In [36]:
df.where((df['Close'] < 30) & ~ (df['Open'] < 30)).select(['Open', 'Close']).show()

+---------+---------+
|     Open|    Close|
+---------+---------+
|30.015715|29.918571|
|30.132856|29.418571|
|30.297142|29.724285|
+---------+---------+



In [37]:
df.filter(df['Low'] == 30.34).show()

+----------+-----+---------+-----+---------+---------+---------+
|      Date| Open|     High|  Low|    Close|Adj Close|   Volume|
+----------+-----+---------+-----+---------+---------+---------+
|2010-01-04|30.49|30.642857|30.34|30.572857|26.466835|123432400|
+----------+-----+---------+-----+---------+---------+---------+



In [38]:
resultado = df.filter(df['Low'] == 30.34).collect()

In [39]:
fila = resultado[0]

In [40]:
fila.asDict()['Volume']

123432400

# **Spark SQL Agrupaciones**

In [41]:
from pyspark.sql import SparkSession

In [42]:
spark = SparkSession.builder.appName('agrupaciones').getOrCreate()

In [43]:
df = spark.read.csv('/content/drive/MyDrive/dataset/ventas.csv', inferSchema=True, header=True)

In [44]:
df.show()

+-------+-------+------+
|Empresa|Persona|Ventas|
+-------+-------+------+
|   GOOG| Carlos|   200|
|   GOOG|   Juan|   120|
|   GOOG| Felipe|   340|
|   MSFT|   Tina|   600|
|   MSFT| Andrea|   124|
|   MSFT|  Carla|   243|
|     FB|   Sara|   870|
|     FB|Ignacio|   350|
|   APPL| Miguel|   250|
|   APPL|  Oscar|   130|
|   APPL|  Jorge|   750|
|   APPL|   Ivan|   350|
+-------+-------+------+



In [45]:
df.printSchema()

root
 |-- Empresa: string (nullable = true)
 |-- Persona: string (nullable = true)
 |-- Ventas: integer (nullable = true)



**groupBy()**

In [46]:
df.groupBy('Empresa').sum().show()

+-------+-----------+
|Empresa|sum(Ventas)|
+-------+-----------+
|   APPL|       1480|
|   GOOG|        660|
|     FB|       1220|
|   MSFT|        967|
+-------+-----------+



**agg()**

In [47]:
df.agg({'Ventas':'sum'}).show()

+-----------+
|sum(Ventas)|
+-----------+
|       4327|
+-----------+



In [48]:
agrupado = df.groupBy('Empresa')

In [49]:
agrupado.agg({'Ventas':'sum'}).show()

+-------+-----------+
|Empresa|sum(Ventas)|
+-------+-----------+
|   APPL|       1480|
|   GOOG|        660|
|     FB|       1220|
|   MSFT|        967|
+-------+-----------+



**Otras funciones**

In [50]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [51]:
df.select(countDistinct('Ventas')).show()

+----------------------+
|count(DISTINCT Ventas)|
+----------------------+
|                    11|
+----------------------+



In [52]:
df.select(stddev('Ventas').alias('Media de ventas')).show()

+------------------+
|   Media de ventas|
+------------------+
|250.08742410799007|
+------------------+



In [53]:
from pyspark.sql.functions import format_number

In [54]:
ventas_stdev = df.select(stddev('Ventas'))

In [55]:
ventas_stdev.show()

+------------------+
|    stddev(Ventas)|
+------------------+
|250.08742410799007|
+------------------+



In [56]:
ventas_stdev.select(format_number('stddev(Ventas)', 2).alias('stdev de ventas')).show()

+---------------+
|stdev de ventas|
+---------------+
|         250.09|
+---------------+



**orderBy()**

In [57]:
df.orderBy('Ventas').show()

+-------+-------+------+
|Empresa|Persona|Ventas|
+-------+-------+------+
|   GOOG|   Juan|   120|
|   MSFT| Andrea|   124|
|   APPL|  Oscar|   130|
|   GOOG| Carlos|   200|
|   MSFT|  Carla|   243|
|   APPL| Miguel|   250|
|   GOOG| Felipe|   340|
|     FB|Ignacio|   350|
|   APPL|   Ivan|   350|
|   MSFT|   Tina|   600|
|   APPL|  Jorge|   750|
|     FB|   Sara|   870|
+-------+-------+------+



In [58]:
df.orderBy(df['Ventas'].desc()).show()

+-------+-------+------+
|Empresa|Persona|Ventas|
+-------+-------+------+
|     FB|   Sara|   870|
|   APPL|  Jorge|   750|
|   MSFT|   Tina|   600|
|     FB|Ignacio|   350|
|   APPL|   Ivan|   350|
|   GOOG| Felipe|   340|
|   APPL| Miguel|   250|
|   MSFT|  Carla|   243|
|   GOOG| Carlos|   200|
|   APPL|  Oscar|   130|
|   MSFT| Andrea|   124|
|   GOOG|   Juan|   120|
+-------+-------+------+



# **Spark SQL Valores Nulos**

In [59]:
from pyspark.sql import SparkSession

In [60]:
spark = SparkSession.builder.appName('nulos').getOrCreate()

In [61]:
df = spark.read.csv('/content/drive/MyDrive/dataset/Null.csv', inferSchema=True, header=True)

In [62]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Ventas: integer (nullable = true)
 |-- Clientes: integer (nullable = true)



In [63]:
df.show()

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp1|  John|  NULL|       3|
|emp2|  NULL|  NULL|    NULL|
|emp3|  NULL|   345|    NULL|
|emp4| Cindy|   456|       4|
+----+------+------+--------+



In [64]:
df.na.drop().show() #quita cualquier fila que tenga nulo

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp4| Cindy|   456|       4|
+----+------+------+--------+



In [65]:
df.na.drop(thresh=4).show() #quita cualquier fila que tenga dos nulos

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp4| Cindy|   456|       4|
+----+------+------+--------+



In [66]:
df.na.drop(how='all').show() #quita cualquier fila que tenga todos los nulos

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp1|  John|  NULL|       3|
|emp2|  NULL|  NULL|    NULL|
|emp3|  NULL|   345|    NULL|
|emp4| Cindy|   456|       4|
+----+------+------+--------+



In [67]:
df.na.drop(subset=['Clientes']).show() #quita cualquier fila que tenga nulo en la columna edad

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp1|  John|  NULL|       3|
|emp4| Cindy|   456|       4|
+----+------+------+--------+



**Rellenar = fill()**

In [68]:
df.na.fill(0).show()

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp1|  John|     0|       3|
|emp2|  NULL|     0|       0|
|emp3|  NULL|   345|       0|
|emp4| Cindy|   456|       4|
+----+------+------+--------+



In [69]:
df.na.fill(0, subset=['Ventas']).show()

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp1|  John|     0|       3|
|emp2|  NULL|     0|    NULL|
|emp3|  NULL|   345|    NULL|
|emp4| Cindy|   456|       4|
+----+------+------+--------+



**Imputar la media**

In [70]:
from pyspark.sql.functions import mean

In [71]:
media = df.select(mean(df['Ventas'])).collect()

In [72]:
media[0][0]

400.5

In [73]:
df.na.fill(media[0][0], subset=['Ventas']).show()

+----+------+------+--------+
|  Id|Nombre|Ventas|Clientes|
+----+------+------+--------+
|emp1|  John|   400|       3|
|emp2|  NULL|   400|    NULL|
|emp3|  NULL|   345|    NULL|
|emp4| Cindy|   456|       4|
+----+------+------+--------+



# **Spark SQL Fecha y Tiempo**

In [73]:
from pyspark.sql import SparkSession

In [74]:
spark = SparkSession.builder.appName('fecha').getOrCreate()

In [75]:
df = spark.read.csv('/content/drive/MyDrive/dataset/AAPL.csv', inferSchema=True, header=True)

In [76]:
df.head(1)

[Row(Date=datetime.date(2010, 1, 4), Open=30.49, High=30.642857, Low=30.34, Close=30.572857, Adj Close=26.466835, Volume=123432400)]

In [77]:
df.show()

+----------+---------+---------+---------+---------+---------+---------+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|
+----------+---------+---------+---------+---------+---------+---------+
|2010-01-04|    30.49|30.642857|    30.34|30.572857|26.466835|123432400|
|2010-01-05|30.657143|30.798571|30.464285|30.625713|26.512596|150476200|
|2010-01-06|30.625713|30.747143|30.107143|30.138571|26.090879|138040000|
|2010-01-07|    30.25|30.285715|29.864286|30.082857|26.042646|119282800|
|2010-01-08|30.042856|30.285715|29.865715|30.282858|26.215786|111902700|
|2010-01-11|     30.4|30.428572|29.778572|30.015715|25.984528|115557400|
|2010-01-12|29.884285|29.967142|29.488571|29.674286|25.688946|148614900|
|2010-01-13|29.695715|30.132856|29.157143|30.092857|26.051304|151473000|
|2010-01-14|30.015715|30.065714|29.860001|29.918571|25.900436|108223500|
|2010-01-15|30.132856|30.228571|    29.41|29.418571|25.467583|148516900|
|2010-01-19|29.761429|30.741428|29.605715|30.719999

In [78]:
from pyspark.sql.functions import format_number, dayofmonth, hour, dayofyear, month, year, weekofyear, date_format

In [79]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [80]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [81]:
df.select(dayofyear(df['Date'])).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+
only showing top 20 rows



Ejercicio
Muestra un dataframe de la media de open por año

In [83]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [84]:
df_nuevo = df.withColumn('year', (year(df['Date'])))

In [85]:
df_nuevo.show()

+----------+---------+---------+---------+---------+---------+---------+----+
|      Date|     Open|     High|      Low|    Close|Adj Close|   Volume|year|
+----------+---------+---------+---------+---------+---------+---------+----+
|2010-01-04|    30.49|30.642857|    30.34|30.572857|26.466835|123432400|2010|
|2010-01-05|30.657143|30.798571|30.464285|30.625713|26.512596|150476200|2010|
|2010-01-06|30.625713|30.747143|30.107143|30.138571|26.090879|138040000|2010|
|2010-01-07|    30.25|30.285715|29.864286|30.082857|26.042646|119282800|2010|
|2010-01-08|30.042856|30.285715|29.865715|30.282858|26.215786|111902700|2010|
|2010-01-11|     30.4|30.428572|29.778572|30.015715|25.984528|115557400|2010|
|2010-01-12|29.884285|29.967142|29.488571|29.674286|25.688946|148614900|2010|
|2010-01-13|29.695715|30.132856|29.157143|30.092857|26.051304|151473000|2010|
|2010-01-14|30.015715|30.065714|29.860001|29.918571|25.900436|108223500|2010|
|2010-01-15|30.132856|30.228571|    29.41|29.418571|25.467583|14

In [86]:
resultado = df_nuevo.groupBy('year').mean().select(['year', 'avg(Open)'])

In [87]:
resultado.show()

+----+------------------+
|year|         avg(Open)|
+----+------------------+
|2018|189.11143456573691|
|2015|120.17575393253965|
|2013| 67.58973367460315|
|2014| 92.21983011507933|
|2019|207.86908675793657|
|2020|300.64314603225813|
|2012| 82.37896010000001|
|2016|104.50777772619044|
|2010| 37.13680271825399|
|2011|52.008775376984104|
|2017|150.44490045816727|
+----+------------------+



In [90]:
resultado.select('Year',format_number('avg(Open)', 2).alias('Media Open')).orderBy(resultado['Year']).show()

+----+----------+
|Year|Media Open|
+----+----------+
|2010|     37.14|
|2011|     52.01|
|2012|     82.38|
|2013|     67.59|
|2014|     92.22|
|2015|    120.18|
|2016|    104.51|
|2017|    150.44|
|2018|    189.11|
|2019|    207.87|
|2020|    300.64|
+----+----------+

