# Aula 8 - Pyspark

## Revisão

### Acessar um tabela Hive via pyspark

Criar o contexto `HiveContext`:

    from pyspark.sql import HiveContext
    contexto = HiveContext(sc)


Conectar o banco de dados na tabela:

    banco = contexto.table("hr.jobs")
    banco.show()

Vamos registra a tabela no spark para ficar disponível para execução de querys

    banco.registerTempTable("jobs")
    contexto.sql('Select * from jobs').show()
    contexto.sql('Select *  from jobs order by salario_max DESC limit 1').show()

### Criar um dataframe

A variável `jobs` é nosso dataframe

    jobs = contexto.sql("select * from jobs") 
    jobs.show()
    

### Alguns comandos

    jobs.show(100)
    jobs.printSchema()
    .
    .
    .
   

# Iniciando o pyspark

Para instalar `pyspark` localmente, execute em uma célula:

    pip install pyspark
    
    
**ATENÇÂO:** Se a célula abaixo falhar, poser ser necessário instalar o Java na sua máquina e reiniciar o compurador!

Para que o pyspark funcione, é preciso criar e configurar um ambiente.

In [1]:
# importar as funções
from pyspark import sql, SparkContext, HiveContext

# criar o sparkcontext
sc = SparkContext()

# criar a sessão spark
spark = sql.SparkSession(sc)

In [2]:
sc

In [3]:
spark

# Acessar arquivo csv com pyspark 

## Criar uma variável RDD a partir do CSV

In [4]:
jobs = sc.textFile("data/jobs.csv")
jobs.collect()

['1,Public Accountant,4200.00,9000.00',
 '2,Accounting Manager,8200.00,16000.00',
 '3,Administration Assistant,3000.00,6000.00',
 '4,President,20000.00,40000.00',
 '5,Administration Vice President,15000.00,30000.00',
 '6,Accountant,4200.00,9000.00',
 '7,Finance Manager,8200.00,16000.00',
 '8,Human Resources Representative,4000.00,9000.00',
 '9,Programmer,4000.00,10000.00',
 '10,Marketing Manager,9000.00,15000.00',
 '11,Marketing Representative,4000.00,9000.00',
 '12,Public Relations Representative,4500.00,10500.00',
 '13,Purchasing Clerk,2500.00,5500.00',
 '14,Purchasing Manager,8000.00,15000.00',
 '15,Sales Manager,10000.00,20000.00',
 '16,Sales Representative,6000.00,12000.00',
 '17,Shipping Clerk,2500.00,5500.00',
 '18,Stock Clerk,2000.00,5000.00',
 '19,Stock Manager,5500.00,8500.00']

## Acessar um arquivo csv via pyspark

In [5]:
countires = spark.read.csv("data/countries.csv", header=True)
countires.show()

+----------+------------+---------+
|country_id|country_name|region_id|
+----------+------------+---------+
|        AR|   Argentina|        2|
|        AU|   Australia|        3|
|        BE|     Belgium|        1|
|        BR|      Brazil|        2|
|        CA|      Canada|        2|
|        CH| Switzerland|        1|
|        CN|       China|        3|
|        DE|     Germany|        1|
|        DK|     Denmark|        1|
|        EG|       Egypt|        4|
|        FR|      France|        1|
|        HK|    HongKong|        3|
|        IL|      Israel|        4|
|        IN|       India|        3|
|        IT|       Italy|        1|
|        JP|       Japan|        3|
|        KW|      Kuwait|        4|
|        MX|      Mexico|        2|
|        NG|     Nigeria|        4|
|        NL| Netherlands|        1|
+----------+------------+---------+
only showing top 20 rows



## Adicionar headers quando não estão presentes no arquivo

In [6]:
df = spark.read.csv("data/jobs.csv")
df.show()

+---+--------------------+--------+--------+
|_c0|                 _c1|     _c2|     _c3|
+---+--------------------+--------+--------+
|  1|   Public Accountant| 4200.00| 9000.00|
|  2|  Accounting Manager| 8200.00|16000.00|
|  3|Administration As...| 3000.00| 6000.00|
|  4|           President|20000.00|40000.00|
|  5|Administration Vi...|15000.00|30000.00|
|  6|          Accountant| 4200.00| 9000.00|
|  7|     Finance Manager| 8200.00|16000.00|
|  8|Human Resources R...| 4000.00| 9000.00|
|  9|          Programmer| 4000.00|10000.00|
| 10|   Marketing Manager| 9000.00|15000.00|
| 11|Marketing Represe...| 4000.00| 9000.00|
| 12|Public Relations ...| 4500.00|10500.00|
| 13|    Purchasing Clerk| 2500.00| 5500.00|
| 14|  Purchasing Manager| 8000.00|15000.00|
| 15|       Sales Manager|10000.00|20000.00|
| 16|Sales Representative| 6000.00|12000.00|
| 17|      Shipping Clerk| 2500.00| 5500.00|
| 18|         Stock Clerk| 2000.00| 5000.00|
| 19|       Stock Manager| 5500.00| 8500.00|
+---+-----

In [7]:
# importar arquivos de suporte
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType

# criar schema
schema = StructType() \
      .add("indice",IntegerType(),True) \
      .add("job_title",StringType(),True) \
      .add("salario_min",FloatType(),True) \
      .add("salario_max",FloatType(),True) 

# ler o arquivo
df = spark.read.csv("data/jobs.csv", schema=schema)
df.show()

+------+--------------------+-----------+-----------+
|indice|           job_title|salario_min|salario_max|
+------+--------------------+-----------+-----------+
|     1|   Public Accountant|     4200.0|     9000.0|
|     2|  Accounting Manager|     8200.0|    16000.0|
|     3|Administration As...|     3000.0|     6000.0|
|     4|           President|    20000.0|    40000.0|
|     5|Administration Vi...|    15000.0|    30000.0|
|     6|          Accountant|     4200.0|     9000.0|
|     7|     Finance Manager|     8200.0|    16000.0|
|     8|Human Resources R...|     4000.0|     9000.0|
|     9|          Programmer|     4000.0|    10000.0|
|    10|   Marketing Manager|     9000.0|    15000.0|
|    11|Marketing Represe...|     4000.0|     9000.0|
|    12|Public Relations ...|     4500.0|    10500.0|
|    13|    Purchasing Clerk|     2500.0|     5500.0|
|    14|  Purchasing Manager|     8000.0|    15000.0|
|    15|       Sales Manager|    10000.0|    20000.0|
|    16|Sales Representative

# Analíses com pyspark

## select

In [8]:
df.select('job_title', 'salario_min').show()

+--------------------+-----------+
|           job_title|salario_min|
+--------------------+-----------+
|   Public Accountant|     4200.0|
|  Accounting Manager|     8200.0|
|Administration As...|     3000.0|
|           President|    20000.0|
|Administration Vi...|    15000.0|
|          Accountant|     4200.0|
|     Finance Manager|     8200.0|
|Human Resources R...|     4000.0|
|          Programmer|     4000.0|
|   Marketing Manager|     9000.0|
|Marketing Represe...|     4000.0|
|Public Relations ...|     4500.0|
|    Purchasing Clerk|     2500.0|
|  Purchasing Manager|     8000.0|
|       Sales Manager|    10000.0|
|Sales Representative|     6000.0|
|      Shipping Clerk|     2500.0|
|         Stock Clerk|     2000.0|
|       Stock Manager|     5500.0|
+--------------------+-----------+



## filter e/ou where

In [9]:
df.filter(df.salario_min >=15000).show()

+------+--------------------+-----------+-----------+
|indice|           job_title|salario_min|salario_max|
+------+--------------------+-----------+-----------+
|     4|           President|    20000.0|    40000.0|
|     5|Administration Vi...|    15000.0|    30000.0|
+------+--------------------+-----------+-----------+



## sum, min, max, mean

In [21]:
df.select('salario_min').where(df.salario_min>=0).groupby().sum().show()

+----------------+
|sum(salario_min)|
+----------------+
|        124800.0|
+----------------+



## agg

In [18]:
df.agg({'salario_min': 'sum'}).show()


+----------------+
|sum(salario_min)|
+----------------+
|        124800.0|
+----------------+



## [join](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html)



In [23]:
# ler tabela employees
df_emp = spark.read.csv("data/employees.csv", header=True)
df_emp.show()

+-----------+-----------+----------+--------------------+------------+----------+------+--------+----------+-------------+
|employee_id| first_name| last_name|               email|phone_number| hire_date|job_id|  salary|manager_id|department_id|
+-----------+-----------+----------+--------------------+------------+----------+------+--------+----------+-------------+
|        100|     Steven|      King|steven.king@sqltu...|515.123.4567|1987-06-17|     4|24000.00|      null|            9|
|        101|      Neena|   Kochhar|neena.kochhar@sql...|515.123.4568|1989-09-21|     5|17000.00|       100|            9|
|        102|        Lex|   De Haan|lex.de haan@sqltu...|515.123.4569|1993-01-13|     5|17000.00|       100|            9|
|        103|  Alexander|    Hunold|alexander.hunold@...|590.423.4567|1990-01-03|     9| 9000.00|       102|            6|
|        104|      Bruce|     Ernst|bruce.ernst@sqltu...|590.423.4568|1991-05-21|     9| 6000.00|       103|            6|
|        105|   

In [36]:
# join
df_emp.join(df, on=df_emp.job_id==df.indice, how='left')\
    .select(df_emp.salary, df_emp.job_id, df.job_title, df.salario_min, df.salario_max)\
    .show()

+--------+------+--------------------+-----------+-----------+
|  salary|job_id|           job_title|salario_min|salario_max|
+--------+------+--------------------+-----------+-----------+
|24000.00|     4|           President|    20000.0|    40000.0|
|17000.00|     5|Administration Vi...|    15000.0|    30000.0|
|17000.00|     5|Administration Vi...|    15000.0|    30000.0|
| 9000.00|     9|          Programmer|     4000.0|    10000.0|
| 6000.00|     9|          Programmer|     4000.0|    10000.0|
| 4800.00|     9|          Programmer|     4000.0|    10000.0|
| 4800.00|     9|          Programmer|     4000.0|    10000.0|
| 4200.00|     9|          Programmer|     4000.0|    10000.0|
|12000.00|     7|     Finance Manager|     8200.0|    16000.0|
| 9000.00|     6|          Accountant|     4200.0|     9000.0|
| 8200.00|     6|          Accountant|     4200.0|     9000.0|
| 7700.00|     6|          Accountant|     4200.0|     9000.0|
| 7800.00|     6|          Accountant|     4200.0|     