<a href="https://colab.research.google.com/github/CorsiIsa/Conhecendo_Spark/blob/master/ConhecendoSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Conhecendo o SPARK



Precisamos fazer a instalação do pyspark <pre><code>```pip install pyspark ``` </code></pre><pre><code>```pip install findspark ``` </code></pre>

In [1]:
!pip install pyspark



In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


### Importando a biblioteca

In [11]:
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F
from functools import reduce

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
df = spark.sql("""select 'spark' as hello """)
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



### Manipulando dados com o Spark

importando nossa base de dados

In [7]:
df = spark.read.csv('banklist.csv', sep = ',', inferSchema = True, header = True)

print('df.count  :', df.count())
print('df.col ct :', len(df.columns))
print('df.columns:', df.columns)

df.count  : 561
df.col ct : 6
df.columns: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


### Usando sql com o SPARK

In [8]:
df.createOrReplaceTempView("banklist")

df_check = spark.sql('''select `Bank Name`, City, `Closing Date` from banklist''')
df_check.show(4, truncate=False)

+--------------------------------+-------------+------------+
|Bank Name                       |City         |Closing Date|
+--------------------------------+-------------+------------+
|The First State Bank            |Barboursville|3-Apr-20    |
|Ericson State Bank              |Ericson      |14-Feb-20   |
|City National Bank of New Jersey|Newark       |1-Nov-19    |
|Resolute Bank                   |Maumee       |25-Oct-19   |
+--------------------------------+-------------+------------+
only showing top 4 rows



### Operação basicas de DataFrame com o SPARK

In [13]:
df.describe().show()

+-------+--------------------+-------+----+-----------------+---------------------+------------+
|summary|           Bank Name|   City|  ST|             CERT|Acquiring Institution|Closing Date|
+-------+--------------------+-------+----+-----------------+---------------------+------------+
|  count|                 561|    561| 561|              561|                  561|         561|
|   mean|                NULL|   NULL|NULL|31685.68449197861|                 NULL|        NULL|
| stddev|                NULL|   NULL|NULL|16446.65659309965|                 NULL|        NULL|
|    min|1st American Stat...|Acworth|  AL|               91|      1st United Bank|    1-Aug-08|
|    max|               ebank|Wyoming|  WY|            58701|  Your Community Bank|    9-Sep-11|
+-------+--------------------+-------+----+-----------------+---------------------+------------+



In [14]:
df.describe('City', 'ST').show()

+-------+-------+----+
|summary|   City|  ST|
+-------+-------+----+
|  count|    561| 561|
|   mean|   NULL|NULL|
| stddev|   NULL|NULL|
|    min|Acworth|  AL|
|    max|Wyoming|  WY|
+-------+-------+----+



In [15]:
print('Total de linhas:', df.count())
print('Total de colunas:', len(df.columns))
print('Tipo de dados:', df.dtypes)
print('Schema:', df.schema)

Total de linhas: 561
Total de colunas: 6
Tipo de dados: [('Bank Name', 'string'), ('City', 'string'), ('ST', 'string'), ('CERT', 'int'), ('Acquiring Institution', 'string'), ('Closing Date', 'string')]
Schema: StructType([StructField('Bank Name', StringType(), True), StructField('City', StringType(), True), StructField('ST', StringType(), True), StructField('CERT', IntegerType(), True), StructField('Acquiring Institution', StringType(), True), StructField('Closing Date', StringType(), True)])


Aqui seria como visualizar a estrutura da nossa tabela no sql

In [16]:
print('df schema 1:')
df.printSchema()

df schema 1:
root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)



### Removendo duplicatas

In [17]:
df = df.dropDuplicates()
print('df.count		:', df.count())
print('df.columns	:', df.columns)

df.count		: 561
df.columns	: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


Podemos notar que o tamanho do DF segue o mesmo, sendo assim, não havia dados duplicados no nosso df.

### Selecionando colunas específicas

In [18]:
df2 = df.select(*['Bank Name', 'City'])
df2.show()

+--------------------+----------------+
|           Bank Name|            City|
+--------------------+----------------+
| First Bank of Idaho|         Ketchum|
|Amcore Bank, Nati...|        Rockford|
|        Venture Bank|           Lacey|
|First State Bank ...|           Altus|
|Valley Capital Ba...|            Mesa|
|Michigan Heritage...|Farmington Hills|
|Columbia Savings ...|      Cincinnati|
|       Fidelity Bank|        Dearborn|
|The Park Avenue Bank|        Valdosta|
|Western Commercia...|  Woodland Hills|
|        Syringa Bank|           Boise|
|Republic Federal ...|           Miami|
|Westside Communit...|University Place|
|   First United Bank|           Crete|
|HarVest Bank of M...|    Gaithersburg|
|            BankEast|       Knoxville|
|    Polk County Bank|        Johnston|
|Colorado Capital ...|     Castle Rock|
|         Access Bank|        Champlin|
|Pacific National ...|   San Francisco|
+--------------------+----------------+
only showing top 20 rows



In [20]:
#com o sinal de -, estamos falando "pegue todas as colunas menos essas"
col_l = list(set(df.columns)  - {'CERT','ST'})
#depois passamos nosso filtro no select
df2 = df.select(*col_l)
df2.show()

+----------------+------------+---------------------+--------------------+
|            City|Closing Date|Acquiring Institution|           Bank Name|
+----------------+------------+---------------------+--------------------+
|         Ketchum|   24-Apr-09|      U.S. Bank, N.A.| First Bank of Idaho|
|        Rockford|   23-Apr-10|          Harris N.A.|Amcore Bank, Nati...|
|           Lacey|   11-Sep-09| First-Citizens Ba...|        Venture Bank|
|           Altus|   31-Jul-09|         Herring Bank|First State Bank ...|
|            Mesa|   11-Dec-09| Enterprise Bank &...|Valley Capital Ba...|
|Farmington Hills|   24-Apr-09|       Level One Bank|Michigan Heritage...|
|      Cincinnati|   23-May-14| United Fidelity B...|Columbia Savings ...|
|        Dearborn|   30-Mar-12| The Huntington Na...|       Fidelity Bank|
|        Valdosta|   29-Apr-11|   Bank of the Ozarks|The Park Avenue Bank|
|  Woodland Hills|    5-Nov-10| First California ...|Western Commercia...|
|           Boise|   31-J

### Remomeando nossas colunas

In [21]:
df2 = df \
  .withColumnRenamed('Bank Name'            , 'bank_name') \
  .withColumnRenamed('Acquiring Institution', 'acq_institution') \
  .withColumnRenamed('Closing Date'         , 'closing_date') \
  .withColumnRenamed('ST'                   , 'state') \
  .withColumnRenamed('CERT'                 , 'cert') #\

df2.show()

+--------------------+----------------+-----+-----+--------------------+------------+
|           bank_name|            City|state| cert|     acq_institution|closing_date|
+--------------------+----------------+-----+-----+--------------------+------------+
| First Bank of Idaho|         Ketchum|   ID|34396|     U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|        Rockford|   IL| 3735|         Harris N.A.|   23-Apr-10|
|        Venture Bank|           Lacey|   WA|22868|First-Citizens Ba...|   11-Sep-09|
|First State Bank ...|           Altus|   OK| 9873|        Herring Bank|   31-Jul-09|
|Valley Capital Ba...|            Mesa|   AZ|58399|Enterprise Bank &...|   11-Dec-09|
|Michigan Heritage...|Farmington Hills|   MI|34369|      Level One Bank|   24-Apr-09|
|Columbia Savings ...|      Cincinnati|   OH|32284|United Fidelity B...|   23-May-14|
|       Fidelity Bank|        Dearborn|   MI|33883|The Huntington Na...|   30-Mar-12|
|The Park Avenue Bank|        Valdosta|   GA|19797|  B

### Adicionando colunas

In [22]:
df2 = df.withColumn('state', col('ST'))
df2.show()

+--------------------+----------------+---+-----+---------------------+------------+-----+
|           Bank Name|            City| ST| CERT|Acquiring Institution|Closing Date|state|
+--------------------+----------------+---+-----+---------------------+------------+-----+
| First Bank of Idaho|         Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|
|Amcore Bank, Nati...|        Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|
|        Venture Bank|           Lacey| WA|22868| First-Citizens Ba...|   11-Sep-09|   WA|
|First State Bank ...|           Altus| OK| 9873|         Herring Bank|   31-Jul-09|   OK|
|Valley Capital Ba...|            Mesa| AZ|58399| Enterprise Bank &...|   11-Dec-09|   AZ|
|Michigan Heritage...|Farmington Hills| MI|34369|       Level One Bank|   24-Apr-09|   MI|
|Columbia Savings ...|      Cincinnati| OH|32284| United Fidelity B...|   23-May-14|   OH|
|       Fidelity Bank|        Dearborn| MI|33883| The Huntington Na...|   30-Mar-12|   MI|

### Adicionando regras nas colunas

Iremos criar uma coluna onde ele só possa receber um valor fixo (ou seja, todos os registros terão o mesmo valor)

In [23]:
df2 = df.withColumn('country', lit('US'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|country|
+--------------------+--------+---+-----+---------------------+------------+-------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|     US|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|     US|
+--------------------+--------+---+-----+---------------------+------------+-------+
only showing top 2 rows



### Apagando colunas

In [24]:
df2 = df.drop('CERT')
df2.show(2)

+--------------------+--------+---+---------------------+------------+
|           Bank Name|    City| ST|Acquiring Institution|Closing Date|
+--------------------+--------+---+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+---------------------+------------+
only showing top 2 rows



apagando mais de uma coluna

In [25]:
df2 = df.drop(*['CERT', 'ST'])
df2.show(2)

+--------------------+--------+---------------------+------------+
|           Bank Name|    City|Acquiring Institution|Closing Date|
+--------------------+--------+---------------------+------------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---------------------+------------+
only showing top 2 rows



### Filtrando dados

In [28]:
df2 = df.where(df['ST'] == 'NE')

#Pega valores entre um determinado intervalo
df3 = df.where(df['CERT'].between('1000','2000'))

#Pega onde o ST é NE ou IL
df4 = df.where(df['ST'].isin('NE','IL'))

print('df.count  :', df.count())
print('df2.count :', df2.count())
print('df3.count :', df3.count())
print('df4.count :', df4.count())

df.count  : 561
df2.count : 4
df3.count : 9
df4.count : 73


In [29]:

df2 = df.where((df['ST'] == 'NE') & (df['City'] == 'Ericson'))
df2.show(3)

+------------------+-------+---+-----+---------------------+------------+
|         Bank Name|   City| ST| CERT|Acquiring Institution|Closing Date|
+------------------+-------+---+-----+---------------------+------------+
|Ericson State Bank|Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
+------------------+-------+---+-----+---------------------+------------+



### Substituindo valores no nosso banco

In [30]:

df.show(2)
print('Estamos substituindo 7 por 17')
df.na.replace(7,17).show(2)

+--------------------+--------+---+-----+---------------------+------------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+------------+
only showing top 2 rows

Estamos substituindo 7 por 17
+--------------------+--------+---+-----+---------------------+------------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+------------+
only showing top 2 ro