In [1]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=ccb73b058a1f591f636070121b0cbaa3187a063db72247f1baed56bef57f040f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Dataframes').getOrCreate()
spark

##CREAR DATAFRAME DESDE CERO

In [5]:
datos = [(None,'Smith   ','36636','M',3500),
         ('Michael','   Rose','40288','M',4750),
         ('Robert','Williams','42114','M',None),
         ('Maria','    Jones    ','39192','F',4000)
        ]


In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

esquema = StructType([
    StructField('firstname', StringType(), True),
    StructField('lastname', StringType(), False),
    StructField('id', StringType(), False),
    StructField('gender', StringType(), True),
    StructField('salary', IntegerType(), True)
])


In [7]:
df = spark.createDataFrame(datos, esquema)

df.printSchema()


root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = false)
 |-- id: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [8]:
df.show()

+---------+-------------+-----+------+------+
|firstname|     lastname|   id|gender|salary|
+---------+-------------+-----+------+------+
|     NULL|     Smith   |36636|     M|  3500|
|  Michael|         Rose|40288|     M|  4750|
|   Robert|     Williams|42114|     M|  NULL|
|    Maria|    Jones    |39192|     F|  4000|
+---------+-------------+-----+------+------+



##CREAR DATAFRAME DESDE ARCHIVO (I)

In [9]:
file = '/content/sales.csv'

In [10]:
sales_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(file)

sales_df.printSchema()


root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)



In [11]:
sales_df.show(truncate=False)

+---------+----------+---------------+----------+----------+---------------------------------+----------------------+----------+
|Order_ID |Order_Date|Item_Type      |Units_Sold|Unit_Price|Region                           |Country               |Ship_Date |
+---------+----------+---------------+----------+----------+---------------------------------+----------------------+----------+
|535113847|10/8/2014 |Snacks         |934       |152.58    |Middle East and North Africa     |Azerbaijan            |10/23/2014|
|874708545|2/22/2015 |Cosmetics      |4551      |437.2     |Central America and the Caribbean|Panama                |2/27/2015 |
|854349935|12/9/2015 |Fruits         |9986      |9.33      |Sub-Saharan Africa               |Sao Tome and Principe |1/18/2016 |
|892836844|9/17/2014 |Personal Care  |9118      |81.73     |Sub-Saharan Africa               |Sao Tome and Principe |10/12/2014|
|129280602|2/4/2010  |Household      |5858      |668.27    |Central America and the Caribbean|Bel

##CREAR DATAFRAME DESDE ARCHIVO (II)

In [12]:
from pyspark.sql.types import IntegerType, StringType, FloatType, ArrayType, DateType, BooleanType

persons_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('fav_movies', ArrayType(StringType()), True),
    StructField('salary', FloatType(), True),
    StructField('image_url', StringType(), True),
    StructField('date_of_birth', DateType(), True),
    StructField('active', BooleanType(), True)
])


In [13]:
file = '/content/persons.json'

In [14]:
persons_df = spark.read.format('json').option('multiline', True).schema(persons_schema).load(file)

persons_df.show()


+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  2|   Emelyne|      Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  4|    Ilario|       Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|     Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|


In [15]:
display(persons_df)

DataFrame[id: int, first_name: string, last_name: string, fav_movies: array<string>, salary: float, image_url: string, date_of_birth: date, active: boolean]

###CONSULTA DE DATOS DF

In [16]:
sales_df.select('Order_ID','Item_Type','Units_Sold','Unit_Price','Country').show(10,truncate=False)


+---------+-------------+----------+----------+---------------------+
|Order_ID |Item_Type    |Units_Sold|Unit_Price|Country              |
+---------+-------------+----------+----------+---------------------+
|535113847|Snacks       |934       |152.58    |Azerbaijan           |
|874708545|Cosmetics    |4551      |437.2     |Panama               |
|854349935|Fruits       |9986      |9.33      |Sao Tome and Principe|
|892836844|Personal Care|9118      |81.73     |Sao Tome and Principe|
|129280602|Household    |5858      |668.27    |Belize               |
|473105037|Clothes      |1149      |109.28    |Denmark              |
|754046475|Cosmetics    |7964      |437.2     |Germany              |
|772153747|Fruits       |6307      |9.33      |Turkey               |
|847788178|Snacks       |8217      |152.58    |United Kingdom       |
|471623599|Cosmetics    |2758      |437.2     |Kazakhstan           |
+---------+-------------+----------+----------+---------------------+
only showing top 10 

In [17]:
sales_df.select(sales_df.Order_ID).show(10,truncate=False)

+---------+
|Order_ID |
+---------+
|535113847|
|874708545|
|854349935|
|892836844|
|129280602|
|473105037|
|754046475|
|772153747|
|847788178|
|471623599|
+---------+
only showing top 10 rows



In [18]:
from pyspark.sql.functions import col, expr

sales_df.select(col('Order_ID'), col('Item_Type'), expr("Units_Sold * Unit_Price as TOTAL_PRICE")).show(10)


+---------+-------------+------------------+
| Order_ID|    Item_Type|       TOTAL_PRICE|
+---------+-------------+------------------+
|535113847|       Snacks|         142509.72|
|874708545|    Cosmetics|         1989697.2|
|854349935|       Fruits|          93169.38|
|892836844|Personal Care|         745214.14|
|129280602|    Household|3914725.6599999997|
|473105037|      Clothes|         125562.72|
|754046475|    Cosmetics|         3481860.8|
|772153747|       Fruits|          58844.31|
|847788178|       Snacks|        1253749.86|
|471623599|    Cosmetics|1205797.5999999999|
+---------+-------------+------------------+
only showing top 10 rows



In [19]:
sales_df.filter((col('Region')=='Europe') & (col('Country')=='Spain')) \
.select(col('Order_ID'), col('Country'), col('Item_Type'),expr("Units_Sold * Unit_Price as TOTAL_PRICE")).show(5)


+---------+-------+-------------+------------------+
| Order_ID|Country|    Item_Type|       TOTAL_PRICE|
+---------+-------+-------------+------------------+
|860891091|  Spain|Personal Care|462591.80000000005|
|413236844|  Spain|    Household|         3221061.4|
|621470248|  Spain|      Clothes|         624207.36|
|337587821|  Spain|   Vegetables|        1323837.58|
|420354354|  Spain|       Snacks|         160056.42|
+---------+-------+-------------+------------------+
only showing top 5 rows



In [20]:
sales_df.where(col('Region')=='Europe').where(col('Country')=='Spain') \
.select(col('Order_ID'),col('Country'),col('Item_Type'), expr("Units_Sold * Unit_Price as TOTAL_PRICE")).show(5)


+---------+-------+-------------+------------------+
| Order_ID|Country|    Item_Type|       TOTAL_PRICE|
+---------+-------+-------------+------------------+
|860891091|  Spain|Personal Care|462591.80000000005|
|413236844|  Spain|    Household|         3221061.4|
|621470248|  Spain|      Clothes|         624207.36|
|337587821|  Spain|   Vegetables|        1323837.58|
|420354354|  Spain|       Snacks|         160056.42|
+---------+-------+-------------+------------------+
only showing top 5 rows



## ORDER BY

In [21]:
sales_df.select(col('Order_ID'),col('Country'),col('Item_Type'),col('Units_Sold')) \
.orderBy(col('Units_Sold').desc(),col('Country').asc()).show(20,truncate=False)


+---------+------------------------------+---------------+----------+
|Order_ID |Country                       |Item_Type      |Units_Sold|
+---------+------------------------------+---------------+----------+
|257909476|Cape Verde                    |Household      |10000     |
|261322534|Comoros                       |Cosmetics      |10000     |
|122941577|Federated States of Micronesia|Vegetables     |10000     |
|225874030|Ghana                         |Cereal         |10000     |
|143555104|Iceland                       |Personal Care  |10000     |
|573532950|Iran                          |Snacks         |10000     |
|885743367|Panama                        |Meat           |10000     |
|230469834|Qatar                         |Meat           |10000     |
|240709006|Andorra                       |Beverages      |9999      |
|895982539|Cameroon                      |Beverages      |9999      |
|572350203|Cote d'Ivoire                 |Snacks         |9999      |
|264735591|Cote d'Iv

In [23]:
(sales_df.select('Region','Country')
.orderBy(col('Region').asc_nulls_first())).show()


+------+-----------+
|Region|    Country|
+------+-----------+
|  Asia|   Thailand|
|  Asia|   Maldives|
|  Asia|South Korea|
|  Asia| Kazakhstan|
|  Asia|     Taiwan|
|  Asia|  Singapore|
|  Asia|       Laos|
|  Asia| Kazakhstan|
|  Asia|   Mongolia|
|  Asia| Uzbekistan|
|  Asia|   Cambodia|
|  Asia|    Vietnam|
|  Asia|   Cambodia|
|  Asia|   Thailand|
|  Asia|South Korea|
|  Asia|   Mongolia|
|  Asia| Kazakhstan|
|  Asia|South Korea|
|  Asia| Kazakhstan|
|  Asia| Kyrgyzstan|
+------+-----------+
only showing top 20 rows



In [22]:
print(sales_df.select('Region').distinct().count())

sales_df.select('Region').distinct().show(truncate=False)


7
+---------------------------------+
|Region                           |
+---------------------------------+
|Middle East and North Africa     |
|Australia and Oceania            |
|Europe                           |
|Sub-Saharan Africa               |
|Central America and the Caribbean|
|North America                    |
|Asia                             |
+---------------------------------+



## LIMIT()

In [24]:
sales_df.select(col('Order_ID'),col('Country'),col('Item_Type'),col('Units_Sold')) \
.orderBy(col('Units_Sold').desc(),col('Country').asc()).limit(20).count()


20

##EJERCICIO
###     Devolver los campos producto, unidades vendidas, fechas de pedido y envío, de las ventas de la Zona Logística de Asia, ordenadas por país. Sólo nos interesan los 10 primeros.

##MODIFICAR DATOS DATAFRAME

In [25]:
from pyspark.sql.functions import lit

# VALOR DETERMINADO
sales_df.withColumn("Sent", lit(False)).show(5)

# CAMPO CALCULADO
sales_df.withColumn("Total_Price", expr("Units_Sold *  Unit_Price")).show(5)


+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+-----+
| Order_ID|Order_Date|    Item_Type|Units_Sold|Unit_Price|              Region|             Country| Ship_Date| Sent|
+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+-----+
|535113847| 10/8/2014|       Snacks|       934|    152.58|Middle East and N...|          Azerbaijan|10/23/2014|false|
|874708545| 2/22/2015|    Cosmetics|      4551|     437.2|Central America a...|              Panama| 2/27/2015|false|
|854349935| 12/9/2015|       Fruits|      9986|      9.33|  Sub-Saharan Africa|Sao Tome and Prin...| 1/18/2016|false|
|892836844| 9/17/2014|Personal Care|      9118|     81.73|  Sub-Saharan Africa|Sao Tome and Prin...|10/12/2014|false|
|129280602|  2/4/2010|    Household|      5858|    668.27|Central America a...|              Belize|  3/5/2010|false|
+---------+----------+-------------+----------+---------

## Renamed

In [26]:
print(sales_df.printSchema())

sales_df.withColumnRenamed('Region','Logist_Area').show()

root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)

None
+---------+----------+---------------+----------+----------+--------------------+--------------------+----------+
| Order_ID|Order_Date|      Item_Type|Units_Sold|Unit_Price|         Logist_Area|             Country| Ship_Date|
+---------+----------+---------------+----------+----------+--------------------+--------------------+----------+
|535113847| 10/8/2014|         Snacks|       934|    152.58|Middle East and N...|          Azerbaijan|10/23/2014|
|874708545| 2/22/2015|      Cosmetics|      4551|     437.2|Central America a...|              Panama| 2/27/2015|
|854349935| 12/9/2015|         Fruits|      9986|      9.33|  Sub-Saharan Africa|Sao Tome an

In [27]:
resumen_df = sales_df.withColumn("Total_Price", expr("Units_Sold *  Unit_Price"))

resumen_df.show(5)


+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+------------------+
| Order_ID|Order_Date|    Item_Type|Units_Sold|Unit_Price|              Region|             Country| Ship_Date|       Total_Price|
+---------+----------+-------------+----------+----------+--------------------+--------------------+----------+------------------+
|535113847| 10/8/2014|       Snacks|       934|    152.58|Middle East and N...|          Azerbaijan|10/23/2014|         142509.72|
|874708545| 2/22/2015|    Cosmetics|      4551|     437.2|Central America a...|              Panama| 2/27/2015|         1989697.2|
|854349935| 12/9/2015|       Fruits|      9986|      9.33|  Sub-Saharan Africa|Sao Tome and Prin...| 1/18/2016|          93169.38|
|892836844| 9/17/2014|Personal Care|      9118|     81.73|  Sub-Saharan Africa|Sao Tome and Prin...|10/12/2014|         745214.14|
|129280602|  2/4/2010|    Household|      5858|    668.27|Central America a...|    

## DROP

In [28]:
resumen_df2 = resumen_df.drop('Unit_Price','Region')

resumen_df2.printSchema()


root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Total_Price: double (nullable = true)



In [29]:
df.show()

# eliminamos aquellos con salario nulo
not_null_df = df.dropna(subset='salary')

not_null_df.show()


+---------+-------------+-----+------+------+
|firstname|     lastname|   id|gender|salary|
+---------+-------------+-----+------+------+
|     NULL|     Smith   |36636|     M|  3500|
|  Michael|         Rose|40288|     M|  4750|
|   Robert|     Williams|42114|     M|  NULL|
|    Maria|    Jones    |39192|     F|  4000|
+---------+-------------+-----+------+------+

+---------+-------------+-----+------+------+
|firstname|     lastname|   id|gender|salary|
+---------+-------------+-----+------+------+
|     NULL|     Smith   |36636|     M|  3500|
|  Michael|         Rose|40288|     M|  4750|
|    Maria|    Jones    |39192|     F|  4000|
+---------+-------------+-----+------+------+



In [32]:
sales_df.printSchema()

sales_df.withColumn('Order_ID', col('Order_ID').cast('string')).printSchema()


root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)

root
 |-- Order_ID: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)



In [33]:
sales_df.printSchema()

sales_df.withColumn('Order_Date', col('Order_Date').cast('date')).printSchema()


root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)

root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Ship_Date: string (nullable = true)



## TRIM

In [31]:
from pyspark.sql.functions import ltrim, rtrim, trim

df.show()

corregido2 = df.withColumn('lastname', trim(col('lastname')))

corregido2.show()


+---------+-------------+-----+------+------+
|firstname|     lastname|   id|gender|salary|
+---------+-------------+-----+------+------+
|     NULL|     Smith   |36636|     M|  3500|
|  Michael|         Rose|40288|     M|  4750|
|   Robert|     Williams|42114|     M|  NULL|
|    Maria|    Jones    |39192|     F|  4000|
+---------+-------------+-----+------+------+

+---------+--------+-----+------+------+
|firstname|lastname|   id|gender|salary|
+---------+--------+-----+------+------+
|     NULL|   Smith|36636|     M|  3500|
|  Michael|    Rose|40288|     M|  4750|
|   Robert|Williams|42114|     M|  NULL|
|    Maria|   Jones|39192|     F|  4000|
+---------+--------+-----+------+------+



##EXTRA: ESCRIBIR (GUARDAR) DATAFRAME RESULTADO, FORMATO PARQUET (DISTRIBUIDO)

In [None]:
path = 'dbfs:/FileStore/shared_uploads/edurf.cld@gmail.com/sales'
(sales_df.write
       	.format("parquet")
            	.mode("overwrite")
            	.option("compression", "snappy")
            	.save(path))


In [None]:
%fs ls 'dbfs:/FileStore/shared_uploads/edurf.cld@gmail.com/sales'
