# Uma Introdução ao PySpark


## Instalando o PySpark


In [1]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 47 kB/s 
[?25hCollecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 53.5 MB/s 
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=73a3e3217556f55f411b40191d89ae72fff739353c69f36ef2a4032befe76f9d
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


## Inicializando uma sessão no Spark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
          .builder \
          .appName("CDIA4-22-PySpark") \
          .getOrCreate()


spark

In [3]:
# Obtendo o contexto do spark
sc = spark.sparkContext
sc

## Criando um `DataFrame` a partir de uma `RDD`

In [4]:
# Creates a random list of something
rdd = sc.parallelize([
  ("XS", 2018, 5.65, 2.79, 6.24), 
  ("XR", 2018, 5.94, 2.98, 6.84), 
  ("X10", 2017, 5.65, 2.79, 6.13), 
  ("8Plus", 2017, 6.23, 3.07, 7.12)
])

# Use these names as the column names
cols = ['Model', 'Year', 'Height', 'Width', 'Weight']

# Create a DataFrame
df = spark.createDataFrame(rdd, schema=cols)
df.show()

+-----+----+------+-----+------+
|Model|Year|Height|Width|Weight|
+-----+----+------+-----+------+
|   XS|2018|  5.65| 2.79|  6.24|
|   XR|2018|  5.94| 2.98|  6.84|
|  X10|2017|  5.65| 2.79|  6.13|
|8Plus|2017|  6.23| 3.07|  7.12|
+-----+----+------+-----+------+



## Criando um `DataFrame` de arquivos

In [6]:
df = spark.read.csv("/content/people.csv", header=True, inferSchema=True)
df.show()

+---------+---------+---+------+
|firstname| lastname|age|gender|
+---------+---------+---+------+
|     Liam|    Smith| 12|     M|
|     Noah|  Johnson| 14|     M|
|   Oliver| Williams| 30|     M|
|   Elijah|    Brown| 24|     M|
|    James|    Jones| 56|     M|
|  William|   Miller| 23|     M|
| Benjamin|    Davis| 67|     M|
|    Lucas|   Garcia| 50|     M|
|    Henry|Rodriguez| 15|     M|
|   Olivia|   Wilson| 37|     F|
|     Emma| Martinez| 32|     F|
|Charlotte| Anderson| 46|     F|
|   Amelia|   Taylor| 42|     F|
|      Ava|   Thomas| 11|     F|
|   Sophia|Hernandez| 70|     F|
| Isabella|    Moore| 66|     F|
|      Mia|   Martin| 33|     F|
|   Evelyn|  Jackson| 19|     F|
|   Harper| Thompson| 18|     F|
+---------+---------+---+------+



## Inspecionando `DataFrames`

In [7]:
df.dtypes

[('firstname', 'string'),
 ('lastname', 'string'),
 ('age', 'int'),
 ('gender', 'string')]

In [8]:
df.head(3)

[Row(firstname='Liam', lastname='Smith', age=12, gender='M'),
 Row(firstname='Noah', lastname='Johnson', age=14, gender='M'),
 Row(firstname='Oliver', lastname='Williams', age=30, gender='M')]

In [10]:
df.first()

Row(firstname='Liam', lastname='Smith', age=12, gender='M')

In [9]:
df.take(3)

[Row(firstname='Liam', lastname='Smith', age=12, gender='M'),
 Row(firstname='Noah', lastname='Johnson', age=14, gender='M'),
 Row(firstname='Oliver', lastname='Williams', age=30, gender='M')]

In [11]:
df.schema

StructType([StructField('firstname', StringType(), True), StructField('lastname', StringType(), True), StructField('age', IntegerType(), True), StructField('gender', StringType(), True)])

In [13]:
df.describe().show()

+-------+---------+--------+------------------+------+
|summary|firstname|lastname|               age|gender|
+-------+---------+--------+------------------+------+
|  count|       19|      19|                19|    19|
|   mean|     null|    null|              35.0|  null|
| stddev|     null|    null|19.499287736279555|  null|
|    min|   Amelia|Anderson|                11|     F|
|    max|  William|  Wilson|                70|     M|
+-------+---------+--------+------------------+------+



In [14]:
df.columns

['firstname', 'lastname', 'age', 'gender']

In [15]:
df.count()

19

In [16]:
df.distinct().count()

19

In [17]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)



In [18]:
df.explain()

== Physical Plan ==
FileScan csv [firstname#73,lastname#74,age#75,gender#76] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<firstname:string,lastname:string,age:int,gender:string>




## Queries

### `select()`

In [19]:
df.select("firstname").show()

+---------+
|firstname|
+---------+
|     Liam|
|     Noah|
|   Oliver|
|   Elijah|
|    James|
|  William|
| Benjamin|
|    Lucas|
|    Henry|
|   Olivia|
|     Emma|
|Charlotte|
|   Amelia|
|      Ava|
|   Sophia|
| Isabella|
|      Mia|
|   Evelyn|
|   Harper|
+---------+



In [21]:
df.select('firstname', 'age').show()

+---------+---+
|firstname|age|
+---------+---+
|     Liam| 12|
|     Noah| 14|
|   Oliver| 30|
|   Elijah| 24|
|    James| 56|
|  William| 23|
| Benjamin| 67|
|    Lucas| 50|
|    Henry| 15|
|   Olivia| 37|
|     Emma| 32|
|Charlotte| 46|
|   Amelia| 42|
|      Ava| 11|
|   Sophia| 70|
| Isabella| 66|
|      Mia| 33|
|   Evelyn| 19|
|   Harper| 18|
+---------+---+



In [23]:
df.select(df.firstname, df.age + 1).show()

+---------+---------+
|firstname|(age + 1)|
+---------+---------+
|     Liam|       13|
|     Noah|       15|
|   Oliver|       31|
|   Elijah|       25|
|    James|       57|
|  William|       24|
| Benjamin|       68|
|    Lucas|       51|
|    Henry|       16|
|   Olivia|       38|
|     Emma|       33|
|Charlotte|       47|
|   Amelia|       43|
|      Ava|       12|
|   Sophia|       71|
| Isabella|       67|
|      Mia|       34|
|   Evelyn|       20|
|   Harper|       19|
+---------+---------+



In [25]:
df.select(df.firstname, df.age > 30).show()

+---------+----------+
|firstname|(age > 30)|
+---------+----------+
|     Liam|     false|
|     Noah|     false|
|   Oliver|     false|
|   Elijah|     false|
|    James|      true|
|  William|     false|
| Benjamin|      true|
|    Lucas|      true|
|    Henry|     false|
|   Olivia|      true|
|     Emma|      true|
|Charlotte|      true|
|   Amelia|      true|
|      Ava|     false|
|   Sophia|      true|
| Isabella|      true|
|      Mia|      true|
|   Evelyn|     false|
|   Harper|     false|
+---------+----------+



### `.alias()`

In [27]:
df.select(
    df.firstname,
    (df.age + 1).alias("age")
).show()

+---------+---+
|firstname|age|
+---------+---+
|     Liam| 13|
|     Noah| 15|
|   Oliver| 31|
|   Elijah| 25|
|    James| 57|
|  William| 24|
| Benjamin| 68|
|    Lucas| 51|
|    Henry| 16|
|   Olivia| 38|
|     Emma| 33|
|Charlotte| 47|
|   Amelia| 43|
|      Ava| 12|
|   Sophia| 71|
| Isabella| 67|
|      Mia| 34|
|   Evelyn| 20|
|   Harper| 19|
+---------+---+



### `groupBy()`

In [30]:
grouped = df.groupBy("age").count()
grouped.show()

+---+-----+
|age|count|
+---+-----+
| 12|    1|
| 19|    1|
| 15|    1|
| 37|    1|
| 23|    1|
| 50|    1|
| 24|    1|
| 70|    1|
| 32|    1|
| 56|    1|
| 11|    1|
| 33|    1|
| 14|    1|
| 42|    1|
| 30|    1|
| 66|    1|
| 67|    1|
| 46|    1|
| 18|    1|
+---+-----+



### `filter()`

In [31]:
filtered = df.filter(
    df.age > 30
).show()

+---------+---------+---+------+
|firstname| lastname|age|gender|
+---------+---------+---+------+
|    James|    Jones| 56|     M|
| Benjamin|    Davis| 67|     M|
|    Lucas|   Garcia| 50|     M|
|   Olivia|   Wilson| 37|     F|
|     Emma| Martinez| 32|     F|
|Charlotte| Anderson| 46|     F|
|   Amelia|   Taylor| 42|     F|
|   Sophia|Hernandez| 70|     F|
| Isabella|    Moore| 66|     F|
|      Mia|   Martin| 33|     F|
+---------+---------+---+------+



### `substring`

In [32]:
df.select(
    df.firstname.substr(1,3).alias('name')
).show()

+----+
|name|
+----+
| Lia|
| Noa|
| Oli|
| Eli|
| Jam|
| Wil|
| Ben|
| Luc|
| Hen|
| Oli|
| Emm|
| Cha|
| Ame|
| Ava|
| Sop|
| Isa|
| Mia|
| Eve|
| Har|
+----+



### `like()`

In [35]:
from pyspark.sql.functions import col

df.select(
    col("lastname"),
    col("lastname").like("%Tay%")
).show()

+---------+-------------------+
| lastname|lastname LIKE %Tay%|
+---------+-------------------+
|    Smith|              false|
|  Johnson|              false|
| Williams|              false|
|    Brown|              false|
|    Jones|              false|
|   Miller|              false|
|    Davis|              false|
|   Garcia|              false|
|Rodriguez|              false|
|   Wilson|              false|
| Martinez|              false|
| Anderson|              false|
|   Taylor|               true|
|   Thomas|              false|
|Hernandez|              false|
|    Moore|              false|
|   Martin|              false|
|  Jackson|              false|
| Thompson|              false|
+---------+-------------------+



### `startswith()`

In [37]:
df.select(
    col("firstname"),
    col("firstname").startswith("M")
).show()

+---------+------------------------+
|firstname|startswith(firstname, M)|
+---------+------------------------+
|     Liam|                   false|
|     Noah|                   false|
|   Oliver|                   false|
|   Elijah|                   false|
|    James|                   false|
|  William|                   false|
| Benjamin|                   false|
|    Lucas|                   false|
|    Henry|                   false|
|   Olivia|                   false|
|     Emma|                   false|
|Charlotte|                   false|
|   Amelia|                   false|
|      Ava|                   false|
|   Sophia|                   false|
| Isabella|                   false|
|      Mia|                    true|
|   Evelyn|                   false|
|   Harper|                   false|
+---------+------------------------+



### `endswith()`

In [39]:
df.select(
    col("firstname"),
    col("firstname").endswith("a")
).show()

+---------+----------------------+
|firstname|endswith(firstname, a)|
+---------+----------------------+
|     Liam|                 false|
|     Noah|                 false|
|   Oliver|                 false|
|   Elijah|                 false|
|    James|                 false|
|  William|                 false|
| Benjamin|                 false|
|    Lucas|                 false|
|    Henry|                 false|
|   Olivia|                  true|
|     Emma|                  true|
|Charlotte|                 false|
|   Amelia|                  true|
|      Ava|                  true|
|   Sophia|                  true|
| Isabella|                  true|
|      Mia|                  true|
|   Evelyn|                 false|
|   Harper|                 false|
+---------+----------------------+



### `between()`

In [43]:
df.select(
    col("age"),
    col("age").between(22,30)
).show()

+---+-----------------------------+
|age|((age >= 22) AND (age <= 30))|
+---+-----------------------------+
| 20|                        false|
| 21|                        false|
| 19|                        false|
| 20|                        false|
| 22|                         true|
+---+-----------------------------+



### Explodindo as colunas

In [44]:
# Motivação do problema: Situação inicial
# nome, idade, cursos em que a pessoa está matriculada
data = [('Jaya', '20', ['SQL','Data Science']),
        ('Milan', '21', ['ML','AI']),
        ('Rohit', '19', ['Programming', 'DSA']),
        ('Maria', '20', ['DBMS', 'Networking']),
        ('Jay', '22', ['Data Analytics','ML'])]

columns = ['Name', 'Age', 'Courses_enrolled']

df = spark.createDataFrame(data, columns)
df.printSchema()
df.show()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Courses_enrolled: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-----+---+--------------------+
| Name|Age|    Courses_enrolled|
+-----+---+--------------------+
| Jaya| 20| [SQL, Data Science]|
|Milan| 21|            [ML, AI]|
|Rohit| 19|  [Programming, DSA]|
|Maria| 20|  [DBMS, Networking]|
|  Jay| 22|[Data Analytics, ML]|
+-----+---+--------------------+



In [45]:
from pyspark.sql.functions import explode

exploded_df = df.select(
    df.Name,
    explode(df.Courses_enrolled)
)
exploded_df.printSchema()
exploded_df.show()

root
 |-- Name: string (nullable = true)
 |-- col: string (nullable = true)

+-----+--------------+
| Name|           col|
+-----+--------------+
| Jaya|           SQL|
| Jaya|  Data Science|
|Milan|            ML|
|Milan|            AI|
|Rohit|   Programming|
|Rohit|           DSA|
|Maria|          DBMS|
|Maria|    Networking|
|  Jay|Data Analytics|
|  Jay|            ML|
+-----+--------------+



### Soluções dos Exercícios 

Considerando o dataset `movies.csv`, responda às questões do Practice 04.

In [46]:
# Importando o movies.csv
df_movies = spark.read.csv("/content/movies.csv", header=True, inferSchema=True)
df_movies.show()

+--------------------+---------+--------------------+--------------+-------------+---------------+---------------+----+
|                film|    genre|         lead-studio|audience-score|profitability|rotten-tomatoes|worldwide-gross|year|
+--------------------+---------+--------------------+--------------+-------------+---------------+---------------+----+
|Zack and Miri Mak...|  Romance|The Weinstein Com...|            70|  1.747541667|             64|          41.94|2008|
|     Youth in Revolt|   Comedy|The Weinstein Com...|            52|         1.09|             68|          19.62|2010|
|You Will Meet a T...|   Comedy|         Independent|            35|  1.211818182|             43|          26.66|2010|
|        When in Rome|   Comedy|              Disney|            44|          0.0|             15|          43.04|2010|
|What Happens in V...|   Comedy|                 Fox|            72|  6.267647029|             28|         219.37|2008|
| Water For Elephants|    Drama|    20th

In [47]:
df_movies.printSchema()

root
 |-- film: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- lead-studio: string (nullable = true)
 |-- audience-score: integer (nullable = true)
 |-- profitability: double (nullable = true)
 |-- rotten-tomatoes: integer (nullable = true)
 |-- worldwide-gross: double (nullable = true)
 |-- year: integer (nullable = true)



Importação dos módulos para a solução dos exercícios

In [None]:
from pyspark.sql.functions import col

In [48]:
# P01. Obtendo uma descrição estatística para os campos numéricos

df_movies.describe().show()

+-------+--------------------+-------+----------------+------------------+-----------------+------------------+------------------+------------------+
|summary|                film|  genre|     lead-studio|    audience-score|    profitability|   rotten-tomatoes|   worldwide-gross|              year|
+-------+--------------------+-------+----------------+------------------+-----------------+------------------+------------------+------------------+
|  count|                  74|     74|              74|                74|               74|                74|                74|                74|
|   mean|                null|   null|            null| 63.91891891891892|4.549382372391891| 46.91891891891892|136.35081081081074| 2009.054054054054|
| stddev|                null|   null|            null|13.683062979972128|8.174251914350178|26.332310876099108|157.06578623243522|1.3537557284756396|
|    min|(500) Days of Summer| Action|20th Century Fox|                35|              0.0|        

In [49]:
# P02. Que filmes tem audiência maior que 70?

df_movies.filter(
    col("audience-score") > 70
).show()

+--------------------+---------+--------------------+--------------+-------------+---------------+---------------+----+
|                film|    genre|         lead-studio|audience-score|profitability|rotten-tomatoes|worldwide-gross|year|
+--------------------+---------+--------------------+--------------+-------------+---------------+---------------+----+
|What Happens in V...|   Comedy|                 Fox|            72|  6.267647029|             28|         219.37|2008|
| Water For Elephants|    Drama|    20th Century Fox|            72|  3.081421053|             60|         117.09|2011|
|              WALL-E|Animation|              Disney|            89|  2.896019067|             96|         521.28|2008|
|            Twilight|  Romance|              Summit|            82|  10.18002703|             49|         376.66|2008|
|The Twilight Saga...|    Drama|              Summit|            78|      14.1964|             27|         709.82|2009|
|        The Proposal|   Comedy|        

In [50]:
# P03. Conte quantos filmes tem para cada ano

df_movies.groupBy(
  col("year")
).count().show()

+----+-----+
|year|count|
+----+-----+
|2007|   11|
|2009|   12|
|2010|   19|
|2011|   13|
|2008|   19|
+----+-----+



In [51]:
# P04. Que filmes começam com a palavra "The" nos títulos?

df_movies.select(
    col("film"),
    col("film").startswith("The")
).show()

+--------------------+---------------------+
|                film|startswith(film, The)|
+--------------------+---------------------+
|Zack and Miri Mak...|                false|
|     Youth in Revolt|                false|
|You Will Meet a T...|                false|
|        When in Rome|                false|
|What Happens in V...|                false|
| Water For Elephants|                false|
|              WALL-E|                false|
|            Waitress|                false|
| Waiting For Forever|                false|
|     Valentine's Day|                false|
|Tyler Perry's Why...|                false|
|Twilight: Breakin...|                false|
|            Twilight|                false|
|      The Ugly Truth|                 true|
|The Twilight Saga...|                 true|
|The Time Traveler...|                 true|
|        The Proposal|                 true|
|The Invention of ...|                 true|
|  The Heartbreak Kid|                 true|
|         

In [52]:
# P05. Que filmes tem receita entre 100 e 200?

df_movies.select(
    col("worldwide-gross"),
    col("worldwide-gross").between(100, 200)
).show()

+---------------+-------------------------------------------------------+
|worldwide-gross|((worldwide-gross >= 100) AND (worldwide-gross <= 200))|
+---------------+-------------------------------------------------------+
|          41.94|                                                  false|
|          19.62|                                                  false|
|          26.66|                                                  false|
|          43.04|                                                  false|
|         219.37|                                                  false|
|         117.09|                                                   true|
|         521.28|                                                  false|
|          22.18|                                                  false|
|           0.03|                                                  false|
|         217.57|                                                  false|
|          55.86|                     