## Instalação do PySpark

Instalando o Java 8

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Baixando spark

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

Descompatacando o spark

In [3]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

Instalando o findspark

In [4]:
!pip install -q findspark

Instalando o pyspark


In [5]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Definindo as variáveis de ambiente

In [6]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.1.2-bin-hadoop2.7'

Iniciando o spark

In [7]:
import findspark
findspark.init()

##Criando uma sessão


In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('Spark03')\
                    .getOrCreate()

##Convertendo tipo de produtos


Criando um dataframe sobre produtos

In [9]:
meus_produtos = [
    {'ID': 1, 'Produto': 'Notebook', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '4200,99'},
    {'ID': 2, 'Produto': 'Monitor', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '750,29'},
    {'ID': 3, 'Produto': 'Memoria Ram 16GB', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '200,00'},
    {'ID': 4, 'Produto': 'Teclado', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '350,27'},
    {'ID': 5, 'Produto': 'Mouse', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '100,50'},
    {'ID': 6, 'Produto': 'Headphone', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '320,15'},
    {'ID': 7, 'Produto': 'SSD 1TB', 'Fabricante': 'Dell', 'Garantia': '2030-05-04', 'Valor': '759,99'},
]

In [10]:
meu_dataframe = spark.createDataFrame(meus_produtos);

In [11]:
meu_dataframe.show()

+----------+----------+---+----------------+-------+
|Fabricante|  Garantia| ID|         Produto|  Valor|
+----------+----------+---+----------------+-------+
|      Dell|2030-05-04|  1|        Notebook|4200,99|
|      Dell|2030-05-04|  2|         Monitor| 750,29|
|      Dell|2030-05-04|  3|Memoria Ram 16GB| 200,00|
|      Dell|2030-05-04|  4|         Teclado| 350,27|
|      Dell|2030-05-04|  5|           Mouse| 100,50|
|      Dell|2030-05-04|  6|       Headphone| 320,15|
|      Dell|2030-05-04|  7|         SSD 1TB| 759,99|
+----------+----------+---+----------------+-------+



In [12]:
meu_dataframe.printSchema()

root
 |-- Fabricante: string (nullable = true)
 |-- Garantia: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Valor: string (nullable = true)



Convertendo preço(string) para numeros(float)


In [13]:
##mudando as virgulas por pontos
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as f

meu_dataframe = meu_dataframe.withColumn('Valor', f.regexp_replace('Valor', ',', '.'))

In [16]:
meu_dataframe.show()

+----------+----------+---+----------------+-------+
|Fabricante|  Garantia| ID|         Produto|  Valor|
+----------+----------+---+----------------+-------+
|      Dell|2030-05-04|  1|        Notebook|4200.99|
|      Dell|2030-05-04|  2|         Monitor| 750.29|
|      Dell|2030-05-04|  3|Memoria Ram 16GB|  200.0|
|      Dell|2030-05-04|  4|         Teclado| 350.27|
|      Dell|2030-05-04|  5|           Mouse|  100.5|
|      Dell|2030-05-04|  6|       Headphone| 320.15|
|      Dell|2030-05-04|  7|         SSD 1TB| 759.99|
+----------+----------+---+----------------+-------+



In [14]:
##mudando para numeros - double
meu_dataframe = meu_dataframe.withColumn('Valor', meu_dataframe['Valor'].cast(DoubleType()))

In [15]:
meu_dataframe.printSchema()

root
 |-- Fabricante: string (nullable = true)
 |-- Garantia: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Valor: double (nullable = true)



In [17]:
##mudando datas
from pyspark.sql.types import StringType
from pyspark.sql import functions as f

meu_dataframe = meu_dataframe.withColumn('Garantia', f.to_date(meu_dataframe.Garantia.cast(StringType()), 'yyyy-MM-dd'))

In [18]:
meu_dataframe.printSchema()

root
 |-- Fabricante: string (nullable = true)
 |-- Garantia: date (nullable = true)
 |-- ID: long (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Valor: double (nullable = true)



##Formula 1

In [20]:
path = '/content/Formula1/drivers.csv'
dataframe = spark.read.csv(path,sep=',', inferSchema=True, header=True)

In [21]:
dataframe.printSchema()

root
 |-- driverId: integer (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- number: string (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)



In [22]:
dataframe.show(25)

+--------+---------------+------+----+----------+-----------+----------+-----------+--------------------+
|driverId|      driverRef|number|code|  forename|    surname|       dob|nationality|                 url|
+--------+---------------+------+----+----------+-----------+----------+-----------+--------------------+
|       1|       hamilton|    44| HAM|     Lewis|   Hamilton|1985-01-07|    British|http://en.wikiped...|
|       2|       heidfeld|    \N| HEI|      Nick|   Heidfeld|1977-05-10|     German|http://en.wikiped...|
|       3|        rosberg|     6| ROS|      Nico|    Rosberg|1985-06-27|     German|http://en.wikiped...|
|       4|         alonso|    14| ALO|  Fernando|     Alonso|1981-07-29|    Spanish|http://en.wikiped...|
|       5|     kovalainen|    \N| KOV|    Heikki| Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|
|       6|       nakajima|    \N| NAK|    Kazuki|   Nakajima|1985-01-11|   Japanese|http://en.wikiped...|
|       7|       bourdais|    \N| BOU| Sébasti

Removendo colunas(dados)

In [25]:
dataframe = dataframe.drop('url')
dataframe = dataframe.drop('number')


In [26]:
dataframe.show(25)

+--------+---------------+----+----------+-----------+----------+-----------+
|driverId|      driverRef|code|  forename|    surname|       dob|nationality|
+--------+---------------+----+----------+-----------+----------+-----------+
|       1|       hamilton| HAM|     Lewis|   Hamilton|1985-01-07|    British|
|       2|       heidfeld| HEI|      Nick|   Heidfeld|1977-05-10|     German|
|       3|        rosberg| ROS|      Nico|    Rosberg|1985-06-27|     German|
|       4|         alonso| ALO|  Fernando|     Alonso|1981-07-29|    Spanish|
|       5|     kovalainen| KOV|    Heikki| Kovalainen|1981-10-19|    Finnish|
|       6|       nakajima| NAK|    Kazuki|   Nakajima|1985-01-11|   Japanese|
|       7|       bourdais| BOU| Sébastien|   Bourdais|1979-02-28|     French|
|       8|      raikkonen| RAI|      Kimi|  Räikkönen|1979-10-17|    Finnish|
|       9|         kubica| KUB|    Robert|     Kubica|1984-12-07|     Polish|
|      10|          glock| GLO|      Timo|      Glock|1982-03-18

In [27]:
dataframe.printSchema()

root
 |-- driverId: integer (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- nationality: string (nullable = true)



In [28]:
#mudando dob para date
from pyspark.sql.types import StringType
from pyspark.sql import functions as f

dataframe = dataframe.withColumn('dob', f.to_date(dataframe.dob.cast(StringType()), 'yyyy-MM-dd'))

In [29]:
dataframe.printSchema()

root
 |-- driverId: integer (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- nationality: string (nullable = true)



Alterando nome das colunas


In [30]:
#driverId|driverRef|code|forename|surname|dob|nationality
dataframe = dataframe.toDF('id_piloto', 'ref_piloto', 'codigo', 'prim_nome', 'ult_nome', 'dt_nasc', 'nacionalidade')

In [31]:
dataframe.show(25)

+---------+---------------+------+----------+-----------+----------+-------------+
|id_piloto|     ref_piloto|codigo| prim_nome|   ult_nome|   dt_nasc|nacionalidade|
+---------+---------------+------+----------+-----------+----------+-------------+
|        1|       hamilton|   HAM|     Lewis|   Hamilton|1985-01-07|      British|
|        2|       heidfeld|   HEI|      Nick|   Heidfeld|1977-05-10|       German|
|        3|        rosberg|   ROS|      Nico|    Rosberg|1985-06-27|       German|
|        4|         alonso|   ALO|  Fernando|     Alonso|1981-07-29|      Spanish|
|        5|     kovalainen|   KOV|    Heikki| Kovalainen|1981-10-19|      Finnish|
|        6|       nakajima|   NAK|    Kazuki|   Nakajima|1985-01-11|     Japanese|
|        7|       bourdais|   BOU| Sébastien|   Bourdais|1979-02-28|       French|
|        8|      raikkonen|   RAI|      Kimi|  Räikkönen|1979-10-17|      Finnish|
|        9|         kubica|   KUB|    Robert|     Kubica|1984-12-07|       Polish|
|   

##Consultas

In [32]:
#exibindo todos os dados
dataframe.select('*').show(25)

+---------+---------------+------+----------+-----------+----------+-------------+
|id_piloto|     ref_piloto|codigo| prim_nome|   ult_nome|   dt_nasc|nacionalidade|
+---------+---------------+------+----------+-----------+----------+-------------+
|        1|       hamilton|   HAM|     Lewis|   Hamilton|1985-01-07|      British|
|        2|       heidfeld|   HEI|      Nick|   Heidfeld|1977-05-10|       German|
|        3|        rosberg|   ROS|      Nico|    Rosberg|1985-06-27|       German|
|        4|         alonso|   ALO|  Fernando|     Alonso|1981-07-29|      Spanish|
|        5|     kovalainen|   KOV|    Heikki| Kovalainen|1981-10-19|      Finnish|
|        6|       nakajima|   NAK|    Kazuki|   Nakajima|1985-01-11|     Japanese|
|        7|       bourdais|   BOU| Sébastien|   Bourdais|1979-02-28|       French|
|        8|      raikkonen|   RAI|      Kimi|  Räikkönen|1979-10-17|      Finnish|
|        9|         kubica|   KUB|    Robert|     Kubica|1984-12-07|       Polish|
|   

In [33]:
#exibindo a nacionalidade, primeiro nome, ultimo nome
dataframe.select('nacionalidade', 'prim_nome', 'ult_nome').show(25)

+-------------+----------+-----------+
|nacionalidade| prim_nome|   ult_nome|
+-------------+----------+-----------+
|      British|     Lewis|   Hamilton|
|       German|      Nick|   Heidfeld|
|       German|      Nico|    Rosberg|
|      Spanish|  Fernando|     Alonso|
|      Finnish|    Heikki| Kovalainen|
|     Japanese|    Kazuki|   Nakajima|
|       French| Sébastien|   Bourdais|
|      Finnish|      Kimi|  Räikkönen|
|       Polish|    Robert|     Kubica|
|       German|      Timo|      Glock|
|     Japanese|    Takuma|       Sato|
|    Brazilian|    Nelson| Piquet Jr.|
|    Brazilian|    Felipe|      Massa|
|      British|     David|  Coulthard|
|      Italian|     Jarno|     Trulli|
|       German|    Adrian|      Sutil|
|   Australian|      Mark|     Webber|
|      British|    Jenson|     Button|
|      British|   Anthony|   Davidson|
|       German| Sebastian|     Vettel|
|      Italian| Giancarlo| Fisichella|
|    Brazilian|    Rubens|Barrichello|
|       German|      Ralf

In [34]:
#exibindo o ref_piloto e dt_nasc
dataframe.select('ref_piloto', 'dt_nasc').show(25)

+---------------+----------+
|     ref_piloto|   dt_nasc|
+---------------+----------+
|       hamilton|1985-01-07|
|       heidfeld|1977-05-10|
|        rosberg|1985-06-27|
|         alonso|1981-07-29|
|     kovalainen|1981-10-19|
|       nakajima|1985-01-11|
|       bourdais|1979-02-28|
|      raikkonen|1979-10-17|
|         kubica|1984-12-07|
|          glock|1982-03-18|
|           sato|1977-01-28|
|      piquet_jr|1985-07-25|
|          massa|1981-04-25|
|      coulthard|1971-03-27|
|         trulli|1974-07-13|
|          sutil|1983-01-11|
|         webber|1976-08-27|
|         button|1980-01-19|
|       davidson|1979-04-18|
|         vettel|1987-07-03|
|     fisichella|1973-01-14|
|    barrichello|1972-05-23|
|ralf_schumacher|1975-06-30|
|         liuzzi|1980-08-06|
|           wurz|1974-02-15|
+---------------+----------+
only showing top 25 rows



In [36]:
#exibindo o ref_piloto e dt_nasc(ANO), mudando o nome apenas nessa consulta
dataframe.select('ref_piloto', f.year('dt_nasc').alias('Ano de Nascimento')).show(25)

+---------------+-----------------+
|     ref_piloto|Ano de Nascimento|
+---------------+-----------------+
|       hamilton|             1985|
|       heidfeld|             1977|
|        rosberg|             1985|
|         alonso|             1981|
|     kovalainen|             1981|
|       nakajima|             1985|
|       bourdais|             1979|
|      raikkonen|             1979|
|         kubica|             1984|
|          glock|             1982|
|           sato|             1977|
|      piquet_jr|             1985|
|          massa|             1981|
|      coulthard|             1971|
|         trulli|             1974|
|          sutil|             1983|
|         webber|             1976|
|         button|             1980|
|       davidson|             1979|
|         vettel|             1987|
|     fisichella|             1973|
|    barrichello|             1972|
|ralf_schumacher|             1975|
|         liuzzi|             1980|
|           wurz|           

In [38]:
#exibindo com filtros - pilotos nascidos em 85
dataframe.select('ref_piloto', f.year('dt_nasc').alias('ano_nascimento')).where('ano_nascimento==1985').show(25)

+----------+--------------+
|ref_piloto|ano_nascimento|
+----------+--------------+
|  hamilton|          1985|
|   rosberg|          1985|
|  nakajima|          1985|
| piquet_jr|          1985|
| maldonado|          1985|
|  ambrosio|          1985|
|     garde|          1985|
+----------+--------------+



In [41]:
#exibindo codiigo, prim_nome, ult_nome apenas dos pilotos brasileiros
dataframe.select('codigo', 'prim_nome', 'ult_nome', 'nacionalidade').where('nacionalidade=="Brazilian"').show(25)

+------+---------+-----------+-------------+
|codigo|prim_nome|   ult_nome|nacionalidade|
+------+---------+-----------+-------------+
|   PIQ|   Nelson| Piquet Jr.|    Brazilian|
|   MAS|   Felipe|      Massa|    Brazilian|
|   BAR|   Rubens|Barrichello|    Brazilian|
|   ZON|  Ricardo|      Zonta|    Brazilian|
|   PIZ|  Antônio|   Pizzonia|    Brazilian|
|    \N|Cristiano|   da Matta|    Brazilian|
|    \N|  Luciano|      Burti|    Brazilian|
|    \N|    Tarso|    Marques|    Brazilian|
|    \N|  Enrique|   Bernoldi|    Brazilian|
|    \N|    Pedro|      Diniz|    Brazilian|
|    \N|  Ricardo|     Rosset|    Brazilian|
|    \N|  Roberto|     Moreno|    Brazilian|
|    \N|   Ayrton|      Senna|    Brazilian|
|    \N|Christian| Fittipaldi|    Brazilian|
|    \N| Maurício|   Gugelmin|    Brazilian|
|    \N|   Nelson|     Piquet|    Brazilian|
|    \N|    Chico|      Serra|    Brazilian|
|    \N|     Raul|     Boesel|    Brazilian|
|    \N|  Emerson| Fittipaldi|    Brazilian|
|    \N|  

In [46]:
#exibindo todas as informações dos pilotos que nasceram na decada de 70
dataframe.select('*').where('dt_nasc >= "1970-01-01" AND dt_nasc < "1979-12-31"').show(25)

+---------+---------------+------+----------+-------------+----------+-------------+
|id_piloto|     ref_piloto|codigo| prim_nome|     ult_nome|   dt_nasc|nacionalidade|
+---------+---------------+------+----------+-------------+----------+-------------+
|        2|       heidfeld|   HEI|      Nick|     Heidfeld|1977-05-10|       German|
|        7|       bourdais|   BOU| Sébastien|     Bourdais|1979-02-28|       French|
|        8|      raikkonen|   RAI|      Kimi|    Räikkönen|1979-10-17|      Finnish|
|       11|           sato|   SAT|    Takuma|         Sato|1977-01-28|     Japanese|
|       14|      coulthard|   COU|     David|    Coulthard|1971-03-27|      British|
|       15|         trulli|   TRU|     Jarno|       Trulli|1974-07-13|      Italian|
|       17|         webber|   WEB|      Mark|       Webber|1976-08-27|   Australian|
|       19|       davidson|   DAV|   Anthony|     Davidson|1979-04-18|      British|
|       21|     fisichella|   FIS| Giancarlo|   Fisichella|1973-0